/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.joda.time.Instant;

@SuppressFBWarnings(value={"SE_BAD_FIELD"})
class DatasetSourceStreaming
implements DataSourceV2,
MicroBatchReadSupport {
    static final String BEAM_SOURCE_OPTION = "beam-source";
    static final String DEFAULT_PARALLELISM = "default-parallelism";
    static final String PIPELINE_OPTIONS = "pipeline-options";
    private static final Offset EMPTY_OFFSET = new Offset(){

        public String json() {
            return "{offset : -1}";
        }
    };

    DatasetSourceStreaming() {
    }

    public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
        return new DatasetMicroBatchReader(checkpointLocation, options);
    }

    private static class DatasetPartitionReader<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements InputPartitionReader<InternalRow> {
        private boolean started = false;
        private boolean closed = false;
        private final UnboundedSource<T, CheckpointMarkT> source;
        private UnboundedSource.UnboundedReader<T> reader;

        DatasetPartitionReader(UnboundedSource<T, CheckpointMarkT> source, SerializablePipelineOptions serializablePipelineOptions) {
            this.source = source;
            try {
                this.reader = source.createReader(serializablePipelineOptions.get(), null);
            }
            catch (IOException e) {
                throw new RuntimeException("Error creating UnboundedReader ", e);
            }
        }

        public boolean next() throws IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start();
            }
            return !this.closed && this.reader.advance();
        }

        public InternalRow get() {
            WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
            return RowHelpers.storeWindowedValueInRow(windowedValue, this.source.getOutputCoder());
        }

        public void close() throws IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    private static class DatasetMicroBatchReader<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements MicroBatchReader,
    Serializable {
        private int numPartitions;
        private UnboundedSource<T, CheckpointMarkT> source;
        private SerializablePipelineOptions serializablePipelineOptions;
        private final List<DatasetPartitionReader> partitionReaders = new ArrayList<DatasetPartitionReader>();

        private DatasetMicroBatchReader(String checkpointLocation, DataSourceOptions options) {
            if (!options.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (UnboundedSource)Base64Serializer.deserializeUnchecked((String)((String)options.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).get()), UnboundedSource.class);
            if (!options.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String)options.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).get());
            Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
            if (!options.get(DatasetSourceStreaming.PIPELINE_OPTIONS).isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String)options.get(DatasetSourceStreaming.PIPELINE_OPTIONS).get());
        }

        public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
        }

        public Offset getStartOffset() {
            return EMPTY_OFFSET;
        }

        public Offset getEndOffset() {
            return EMPTY_OFFSET;
        }

        public Offset deserializeOffset(String json) {
            return EMPTY_OFFSET;
        }

        public void commit(Offset end) {
            for (DatasetPartitionReader partitionReader : this.partitionReaders) {
                try {
                    partitionReader.reader.getCheckpointMark().finalizeCheckpoint();
                }
                catch (IOException e) {
                    throw new RuntimeException(String.format("Commit of Offset %s failed, checkpointMark %s finalizeCheckpoint() failed", end, partitionReader.reader.getCheckpointMark()));
                }
            }
        }

        public void stop() {
            try {
                for (DatasetPartitionReader partitionReader : this.partitionReaders) {
                    if (!partitionReader.started) continue;
                    partitionReader.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Error closing " + this + "partitionReaders", e);
            }
        }

        public StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public List<InputPartition<InternalRow>> planInputPartitions() {
            PipelineOptions options = this.serializablePipelineOptions.get();
            ArrayList<InputPartition<InternalRow>> result = new ArrayList<InputPartition<InternalRow>>();
            try {
                List splits = this.source.split(this.numPartitions, options);
                for (final UnboundedSource split : splits) {
                    result.add(new InputPartition<InternalRow>(){

                        public InputPartitionReader<InternalRow> createPartitionReader() {
                            DatasetPartitionReader datasetPartitionReader = new DatasetPartitionReader(split, serializablePipelineOptions);
                            partitionReaders.add(datasetPartitionReader);
                            return datasetPartitionReader;
                        }
                    });
                }
                return result;
            }
            catch (Exception e) {
                throw new RuntimeException("Error in splitting UnboundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }
    }
}

