/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.NoSuchElementException;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.Accumulator;
import org.apache.spark.Dependency;
import org.apache.spark.HashPartitioner;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;

public class SourceRDD {

    private static class CheckpointableSourcePartition<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    extends SourcePartition<T> {
        private final CheckpointMarkT checkpointMark;

        CheckpointableSourcePartition(int rddId, int index, Source<T> source, CheckpointMarkT checkpointMark) {
            super(rddId, index, source);
            this.checkpointMark = checkpointMark;
        }
    }

    public static class Unbounded<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    extends RDD<Tuple2<Source<T>, CheckpointMarkT>> {
        private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
        private final SerializablePipelineOptions options;
        private final Partitioner partitioner;
        private static final List<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();

        public Unbounded(SparkContext sc, SerializablePipelineOptions options, MicrobatchSource<T, CheckpointMarkT> microbatchSource, int initialNumPartitions) {
            super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.options = options;
            this.microbatchSource = microbatchSource;
            this.partitioner = new HashPartitioner(initialNumPartitions);
        }

        public Partition[] getPartitions() {
            try {
                java.util.List<Source<T>> partitionedSources = this.microbatchSource.split(this.options.get());
                Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
                for (int i = 0; i < partitionedSources.size(); ++i) {
                    partitions[i] = new CheckpointableSourcePartition<T, EmptyCheckpointMark>(this.id(), i, partitionedSources.get(i), EmptyCheckpointMark.get());
                }
                return partitions;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create partitions.", e);
            }
        }

        public Option<Partitioner> partitioner() {
            return Some.apply((Object)this.partitioner);
        }

        public Iterator<Tuple2<Source<T>, CheckpointMarkT>> compute(Partition split, TaskContext context) {
            CheckpointableSourcePartition partition = (CheckpointableSourcePartition)split;
            Tuple2 tuple2 = new Tuple2(partition.getSource(), (Object)partition.checkpointMark);
            return JavaConversions.asScalaIterator(Collections.singleton(tuple2).iterator());
        }
    }

    private static class SourcePartition<T>
    implements Partition {
        private final int rddId;
        private final int index;
        private final Source<T> source;

        SourcePartition(int rddId, int index, Source<T> source) {
            this.rddId = rddId;
            this.index = index;
            this.source = source;
        }

        public int index() {
            return this.index;
        }

        public int hashCode() {
            return 41 * (41 + this.rddId) + this.index;
        }

        public Source<T> getSource() {
            return this.source;
        }
    }

    public static class Bounded<T>
    extends RDD<WindowedValue<T>> {
        private static final Logger LOG = LoggerFactory.getLogger(Bounded.class);
        private final BoundedSource<T> source;
        private final SerializablePipelineOptions options;
        private final int numPartitions;
        private final long bundleSize;
        private final String stepName;
        private final Accumulator<MetricsContainerStepMap> metricsAccum;
        private static final Seq<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();
        private static final long DEFAULT_BUNDLE_SIZE = 0x4000000L;

        public Bounded(SparkContext sc, BoundedSource<T> source, SerializablePipelineOptions options, String stepName) {
            super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.source = source;
            this.options = options;
            this.numPartitions = sc.defaultParallelism();
            Preconditions.checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
            this.bundleSize = ((SparkPipelineOptions)options.get().as(SparkPipelineOptions.class)).getBundleSize();
            this.stepName = stepName;
            this.metricsAccum = MetricsAccumulator.getInstance();
        }

        public Partition[] getPartitions() {
            try {
                java.util.List partitionedSources;
                if (this.bundleSize > 0L) {
                    partitionedSources = this.source.split(this.bundleSize, this.options.get());
                } else {
                    long desiredSizeBytes = 0x4000000L;
                    try {
                        desiredSizeBytes = this.source.getEstimatedSizeBytes(this.options.get()) / (long)this.numPartitions;
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to get estimated bundle size for source {}, using default bundle size of {} bytes.", this.source, (Object)0x4000000L);
                    }
                    partitionedSources = this.source.split(desiredSizeBytes, this.options.get());
                }
                Partition[] partitions = new SourcePartition[partitionedSources.size()];
                for (int i = 0; i < partitionedSources.size(); ++i) {
                    partitions[i] = new SourcePartition(this.id(), i, (Source)partitionedSources.get(i));
                }
                return partitions;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e);
            }
        }

        private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
            try {
                return ((BoundedSource)((SourcePartition)partition).source).createReader(this.options.get());
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
            }
        }

        public Iterator<WindowedValue<T>> compute(Partition split, TaskContext context) {
            MetricsContainerImpl metricsContainer = ((MetricsContainerStepMap)this.metricsAccum.localValue()).getContainer(this.stepName);
            BoundedSource.BoundedReader<T> reader = this.createReader((SourcePartition)split);
            ReaderToIteratorAdapter<T> readerIterator = new ReaderToIteratorAdapter<T>((MetricsContainer)metricsContainer, reader);
            return new InterruptibleIterator(context, JavaConversions.asScalaIterator(readerIterator));
        }

        @VisibleForTesting
        static class ReaderToIteratorAdapter<T>
        implements java.util.Iterator<WindowedValue<T>> {
            private static final boolean FAILED_TO_OBTAIN_NEXT = false;
            private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true;
            private final MetricsContainer metricsContainer;
            private final Source.Reader<T> reader;
            private boolean started = false;
            private boolean closed = false;
            private WindowedValue<T> next = null;

            ReaderToIteratorAdapter(MetricsContainer metricsContainer, Source.Reader<T> reader) {
                this.metricsContainer = metricsContainer;
                this.reader = reader;
            }

            /*
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            private boolean tryProduceNext() {
                try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer((MetricsContainer)this.metricsContainer);){
                    if (this.closed) {
                        boolean bl = false;
                        return bl;
                    }
                    Preconditions.checkState(this.next == null, "unexpected non-null value for next");
                    if (this.seekNext()) {
                        this.next = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
                        boolean bl = true;
                        return bl;
                    }
                    this.close();
                    boolean bl = false;
                    return bl;
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to read data.", e);
                }
            }

            private void close() {
                this.closed = true;
                try {
                    this.reader.close();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private boolean seekNext() throws IOException {
                if (!this.started) {
                    this.started = true;
                    return this.reader.start();
                }
                return !this.closed && this.reader.advance();
            }

            private WindowedValue<T> consumeCurrent() {
                if (this.next == null) {
                    throw new NoSuchElementException();
                }
                WindowedValue<T> current = this.next;
                this.next = null;
                return current;
            }

            private WindowedValue<T> consumeNext() {
                if (this.next == null) {
                    this.tryProduceNext();
                }
                return this.consumeCurrent();
            }

            @Override
            public boolean hasNext() {
                return this.next != null || this.tryProduceNext();
            }

            @Override
            public WindowedValue<T> next() {
                return this.consumeNext();
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        }
    }
}

