/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collections;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.Dependency;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;

public class SourceRDD {

    private static class CheckpointableSourcePartition<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    extends SourcePartition<T> {
        private final CheckpointMarkT checkpointMark;

        CheckpointableSourcePartition(int rddId, int index, Source<T> source, CheckpointMarkT checkpointMark) {
            super(rddId, index, source);
            this.checkpointMark = checkpointMark;
        }
    }

    public static class Unbounded<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    extends RDD<Tuple2<Source<T>, CheckpointMarkT>> {
        private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
        private final SparkRuntimeContext runtimeContext;
        private static final List<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();

        public Unbounded(SparkContext sc, SparkRuntimeContext runtimeContext, MicrobatchSource<T, CheckpointMarkT> microbatchSource) {
            super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.runtimeContext = runtimeContext;
            this.microbatchSource = microbatchSource;
        }

        public Partition[] getPartitions() {
            try {
                java.util.List<BoundedSource<T>> partitionedSources = this.microbatchSource.splitIntoBundles(-1L, this.runtimeContext.getPipelineOptions());
                Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
                for (int i = 0; i < partitionedSources.size(); ++i) {
                    partitions[i] = new CheckpointableSourcePartition(this.id(), i, (Source)partitionedSources.get(i), EmptyCheckpointMark.get());
                }
                return partitions;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create partitions.", e);
            }
        }

        public Iterator<Tuple2<Source<T>, CheckpointMarkT>> compute(Partition split, TaskContext context) {
            CheckpointableSourcePartition partition = (CheckpointableSourcePartition)split;
            Tuple2 tuple2 = new Tuple2(partition.getSource(), (Object)partition.checkpointMark);
            return JavaConversions.asScalaIterator(Collections.singleton(tuple2).iterator());
        }
    }

    private static class SourcePartition<T>
    implements Partition {
        private final int rddId;
        private final int index;
        private final Source<T> source;

        SourcePartition(int rddId, int index, Source<T> source) {
            this.rddId = rddId;
            this.index = index;
            this.source = source;
        }

        public int index() {
            return this.index;
        }

        public int hashCode() {
            return 41 * (41 + this.rddId) + this.index;
        }

        public Source<T> getSource() {
            return this.source;
        }
    }

    public static class Bounded<T>
    extends RDD<WindowedValue<T>> {
        private static final Logger LOG = LoggerFactory.getLogger(Bounded.class);
        private final BoundedSource<T> source;
        private final SparkRuntimeContext runtimeContext;
        private final int numPartitions;
        private static final Seq<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();
        private static final long DEFAULT_BUNDLE_SIZE = 0x4000000L;

        public Bounded(SparkContext sc, BoundedSource<T> source, SparkRuntimeContext runtimeContext) {
            super(sc, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.source = source;
            this.runtimeContext = runtimeContext;
            this.numPartitions = sc.defaultParallelism();
            Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
        }

        public Partition[] getPartitions() {
            long desiredSizeBytes = 0x4000000L;
            try {
                desiredSizeBytes = this.source.getEstimatedSizeBytes(this.runtimeContext.getPipelineOptions()) / (long)this.numPartitions;
            }
            catch (Exception e) {
                LOG.warn("Failed to get estimated bundle size for source {}, using default bundle size of {} bytes.", this.source, (Object)0x4000000L);
            }
            try {
                java.util.List partitionedSources = this.source.splitIntoBundles(desiredSizeBytes, this.runtimeContext.getPipelineOptions());
                Partition[] partitions = new SourcePartition[partitionedSources.size()];
                for (int i = 0; i < partitionedSources.size(); ++i) {
                    partitions[i] = new SourcePartition(this.id(), i, (Source)partitionedSources.get(i));
                }
                return partitions;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e);
            }
        }

        public Iterator<WindowedValue<T>> compute(final Partition split, TaskContext context) {
            java.util.Iterator iter = new java.util.Iterator<WindowedValue<T>>(){
                SourcePartition<T> partition;
                BoundedSource.BoundedReader<T> reader;
                private boolean finished;
                private boolean started;
                private boolean closed;
                {
                    this.partition = (SourcePartition)split;
                    this.reader = Bounded.this.createReader(this.partition);
                    this.finished = false;
                    this.started = false;
                    this.closed = false;
                }

                @Override
                public boolean hasNext() {
                    try {
                        if (!this.started) {
                            this.started = true;
                            this.finished = !this.reader.start();
                        } else {
                            boolean bl = this.finished = !this.reader.advance();
                        }
                        if (this.finished) {
                            this.closeIfNotClosed();
                        }
                        return !this.finished;
                    }
                    catch (IOException e) {
                        this.closeIfNotClosed();
                        throw new RuntimeException("Failed to read from reader.", e);
                    }
                }

                @Override
                public WindowedValue<T> next() {
                    return WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException("Remove from partition iterator is not allowed.");
                }

                private void closeIfNotClosed() {
                    if (!this.closed) {
                        this.closed = true;
                        try {
                            this.reader.close();
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Failed to close Reader.", e);
                        }
                    }
                }
            };
            return new InterruptibleIterator(context, JavaConversions.asScalaIterator((java.util.Iterator)iter));
        }

        private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
            try {
                return ((BoundedSource)((SourcePartition)partition).source).createReader(this.runtimeContext.getPipelineOptions());
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
            }
        }
    }
}

