/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.spark.repackaged.com.google.common.base.Stopwatch;
import org.apache.beam.spark.repackaged.com.google.common.collect.Iterators;
import org.apache.spark.streaming.State;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function3;
import scala.Option;
import scala.Tuple2;
import scala.runtime.AbstractFunction3;

public class StateSpecFunctions {
    private static final Logger LOG = LoggerFactory.getLogger(StateSpecFunctions.class);

    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> Function3<Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata>> mapSourceFunction(final SparkRuntimeContext runtimeContext) {
        return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata>>(){

            public Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata> apply(Source<T> source, Option<CheckpointMarkT> startCheckpointMark, State<Tuple2<byte[], Instant>> state) {
                Instant highWatermark;
                BoundedSource.BoundedReader reader;
                UnboundedSource.CheckpointMark checkpointMark;
                MicrobatchSource microbatchSource = (MicrobatchSource)source;
                Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Coder checkpointCoder = microbatchSource.getCheckpointMarkCoder();
                if (state.exists()) {
                    lowWatermark = (Instant)((Tuple2)state.get())._2();
                    checkpointMark = (UnboundedSource.CheckpointMark)CoderHelpers.fromByteArray((byte[])((Tuple2)state.get())._1(), checkpointCoder);
                    LOG.info("Continue reading from an existing CheckpointMark.");
                } else if (startCheckpointMark.isDefined() && !((UnboundedSource.CheckpointMark)startCheckpointMark.get()).equals(EmptyCheckpointMark.get())) {
                    checkpointMark = (UnboundedSource.CheckpointMark)startCheckpointMark.get();
                    LOG.info("Start reading from a provided CheckpointMark.");
                } else {
                    checkpointMark = null;
                    LOG.info("No CheckpointMark provided, start reading from default.");
                }
                try {
                    reader = microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                final ArrayList<byte[]> readValues = new ArrayList<byte[]>();
                WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)source.getDefaultOutputCoder(), (Coder)GlobalWindow.Coder.INSTANCE);
                try {
                    boolean finished;
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    boolean bl = finished = !reader.start();
                    while (!finished) {
                        WindowedValue wv = WindowedValue.of((Object)reader.getCurrent(), (Instant)reader.getCurrentTimestamp(), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
                        readValues.add(CoderHelpers.toByteArray(wv, coder));
                        finished = !reader.advance();
                    }
                    Instant sourceWatermark = ((MicrobatchSource.Reader)reader).getWatermark();
                    highWatermark = sourceWatermark.isAfter((ReadableInstant)lowWatermark) ? sourceWatermark : lowWatermark;
                    reader.close();
                    LOG.info("Source id {} spent {} msec on reading.", (Object)microbatchSource.getId(), (Object)stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
                    Object finishedReadCheckpointMark = ((MicrobatchSource.Reader)reader).getCheckpointMark();
                    byte[] codedCheckpoint = new byte[]{};
                    if (finishedReadCheckpointMark != null) {
                        codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
                    } else {
                        LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
                    }
                    state.update((Object)new Tuple2((Object)codedCheckpoint, (Object)highWatermark));
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to read from reader.", e);
                }
                Iterable<byte[]> iterable = new Iterable<byte[]>(){

                    @Override
                    public Iterator<byte[]> iterator() {
                        return Iterators.unmodifiableIterator(readValues.iterator());
                    }
                };
                return new Tuple2((Object)iterable, (Object)new SparkUnboundedSource.Metadata(readValues.size(), lowWatermark, highWatermark));
            }
        };
    }

    private static abstract class SerializableFunction3<T1, T2, T3, T4>
    extends AbstractFunction3<T1, T2, T3, T4>
    implements Serializable {
        private SerializableFunction3() {
        }
    }
}

