/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SchemaHelpers;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
import org.joda.time.Instant;

public class DatasetSourceBatch
implements DataSourceV2,
ReadSupport {
    public DataSourceReader createReader(DataSourceOptions options) {
        return new DatasetReader(options);
    }

    private static class DatasetPartitionReader<T>
    implements InputPartitionReader<InternalRow> {
        private boolean started = false;
        private boolean closed = false;
        private final BoundedSource<T> source;
        private BoundedSource.BoundedReader<T> reader;

        DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
            this.source = source;
            try {
                this.reader = source.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
            }
            catch (IOException e) {
                throw new RuntimeException("Error creating BoundedReader ", e);
            }
        }

        public boolean next() throws IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start();
            }
            return !this.closed && this.reader.advance();
        }

        public InternalRow get() {
            WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
            return RowHelpers.storeWindowedValueInRow(windowedValue, this.source.getOutputCoder());
        }

        public void close() throws IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    private static class DatasetReader<T>
    implements DataSourceReader,
    Serializable {
        private int numPartitions;
        private BoundedSource<T> source;
        private SerializablePipelineOptions serializablePipelineOptions;

        private DatasetReader(DataSourceOptions options) {
            if (!options.get("beam-source").isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (BoundedSource)Base64Serializer.deserializeUnchecked((String)((String)options.get("beam-source").get()), BoundedSource.class);
            if (!options.get("default-parallelism").isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String)options.get("default-parallelism").get());
            Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
            if (!options.get("pipeline-options").isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String)options.get("pipeline-options").get());
        }

        public StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            PipelineOptions options = this.serializablePipelineOptions.get();
            ArrayList<InputPartition<InternalRow>> result = new ArrayList<InputPartition<InternalRow>>();
            try {
                long desiredSizeBytes = this.source.getEstimatedSizeBytes(options) / (long)this.numPartitions;
                List splits = this.source.split(desiredSizeBytes, options);
                for (BoundedSource split : splits) {
                    result.add((InputPartition<InternalRow>)(InputPartition & Serializable)() -> new DatasetPartitionReader(split, this.serializablePipelineOptions));
                }
                return result;
            }
            catch (Exception e) {
                throw new RuntimeException("Error in splitting BoundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }
    }
}

