/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

@SuppressFBWarnings(value={"SE_BAD_FIELD"})
class DatasetSourceStreaming
implements DataSourceV2,
MicroBatchReadSupport {
    static final @UnknownKeyFor @NonNull @Initialized String BEAM_SOURCE_OPTION = "beam-source";
    static final @UnknownKeyFor @NonNull @Initialized String DEFAULT_PARALLELISM = "default-parallelism";
    static final @UnknownKeyFor @NonNull @Initialized String PIPELINE_OPTIONS = "pipeline-options";
    private static final @UnknownKeyFor @NonNull @Initialized Offset EMPTY_OFFSET = new Offset(){

        public @UnknownKeyFor @NonNull @Initialized String json() {
            return "{offset : -1}";
        }
    };

    DatasetSourceStreaming() {
    }

    public @UnknownKeyFor @NonNull @Initialized MicroBatchReader createMicroBatchReader(@UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized StructType> schema, @UnknownKeyFor @NonNull @Initialized String checkpointLocation, @UnknownKeyFor @NonNull @Initialized DataSourceOptions options) {
        return new DatasetMicroBatchReader(checkpointLocation, options);
    }

    private static class DatasetPartitionReader<@UnknownKeyFor T, @UnknownKeyFor CheckpointMarkT extends // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
    implements InputPartitionReader<InternalRow> {
        private @UnknownKeyFor @NonNull @Initialized boolean started = false;
        private @UnknownKeyFor @NonNull @Initialized boolean closed = false;
        private final @UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointMarkT> source;
        private // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<T> reader;

        DatasetPartitionReader(@UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointMarkT> source, @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializablePipelineOptions) {
            this.source = source;
            try {
                this.reader = source.createReader(serializablePipelineOptions.get(), null);
            }
            catch (IOException e) {
                throw new RuntimeException("Error creating UnboundedReader ", e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized boolean next() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start();
            }
            return !this.closed && this.reader.advance();
        }

        public @UnknownKeyFor @NonNull @Initialized InternalRow get() {
            WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
            return RowHelpers.storeWindowedValueInRow(windowedValue, this.source.getOutputCoder());
        }

        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    private static class DatasetMicroBatchReader<@UnknownKeyFor T, @UnknownKeyFor CheckpointMarkT extends // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
    implements MicroBatchReader,
    Serializable {
        private @UnknownKeyFor @NonNull @Initialized int numPartitions;
        private @UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointMarkT> source;
        private @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializablePipelineOptions;
        private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized DatasetPartitionReader> partitionReaders = new ArrayList<DatasetPartitionReader>();

        private DatasetMicroBatchReader(@UnknownKeyFor @NonNull @Initialized String checkpointLocation, @UnknownKeyFor @NonNull @Initialized DataSourceOptions options) {
            if (!options.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (UnboundedSource)Base64Serializer.deserializeUnchecked((String)((String)options.get(DatasetSourceStreaming.BEAM_SOURCE_OPTION).get()), UnboundedSource.class);
            if (!options.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String)options.get(DatasetSourceStreaming.DEFAULT_PARALLELISM).get());
            Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
            if (!options.get(DatasetSourceStreaming.PIPELINE_OPTIONS).isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String)options.get(DatasetSourceStreaming.PIPELINE_OPTIONS).get());
        }

        public void setOffsetRange(@UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Offset> start, @UnknownKeyFor @NonNull @Initialized Optional<@UnknownKeyFor @NonNull @Initialized Offset> end) {
        }

        public @UnknownKeyFor @NonNull @Initialized Offset getStartOffset() {
            return EMPTY_OFFSET;
        }

        public @UnknownKeyFor @NonNull @Initialized Offset getEndOffset() {
            return EMPTY_OFFSET;
        }

        public @UnknownKeyFor @NonNull @Initialized Offset deserializeOffset(@UnknownKeyFor @NonNull @Initialized String json) {
            return EMPTY_OFFSET;
        }

        public void commit(@UnknownKeyFor @NonNull @Initialized Offset end) {
            for (DatasetPartitionReader partitionReader : this.partitionReaders) {
                try {
                    partitionReader.reader.getCheckpointMark().finalizeCheckpoint();
                }
                catch (IOException e) {
                    throw new RuntimeException(String.format("Commit of Offset %s failed, checkpointMark %s finalizeCheckpoint() failed", end, partitionReader.reader.getCheckpointMark()));
                }
            }
        }

        public void stop() {
            try {
                for (DatasetPartitionReader partitionReader : this.partitionReaders) {
                    if (!partitionReader.started) continue;
                    partitionReader.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Error closing " + this + "partitionReaders", e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized InputPartition<@UnknownKeyFor @NonNull @Initialized InternalRow>> planInputPartitions() {
            PipelineOptions options = this.serializablePipelineOptions.get();
            ArrayList<InputPartition<InternalRow>> result = new ArrayList<InputPartition<InternalRow>>();
            try {
                List splits = this.source.split(this.numPartitions, options);
                for (final UnboundedSource split : splits) {
                    result.add(new InputPartition<InternalRow>(){

                        public @UnknownKeyFor @NonNull @Initialized InputPartitionReader<@UnknownKeyFor @NonNull @Initialized InternalRow> createPartitionReader() {
                            DatasetPartitionReader datasetPartitionReader = new DatasetPartitionReader(split, serializablePipelineOptions);
                            partitionReaders.add(datasetPartitionReader);
                            return datasetPartitionReader;
                        }
                    });
                }
                return result;
            }
            catch (Exception e) {
                throw new RuntimeException("Error in splitting UnboundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }
    }
}

