/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Stopwatch;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.streaming.State;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function3;
import scala.Option;
import scala.Tuple2;
import scala.runtime.AbstractFunction3;

public class StateSpecFunctions {
    private static final Logger LOG = LoggerFactory.getLogger(StateSpecFunctions.class);

    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> Function3<Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata>> mapSourceFunction(final SparkRuntimeContext runtimeContext, final String stepName) {
        return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata>>(){

            /*
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            public Tuple2<Iterable<byte[]>, SparkUnboundedSource.Metadata> apply(Source<T> source, Option<CheckpointMarkT> startCheckpointMark, State<Tuple2<byte[], Instant>> state) {
                MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
                MetricsContainerImpl metricsContainer = metricsContainers.getContainer(stepName);
                try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer((MetricsContainer)metricsContainer);){
                    Instant highWatermark;
                    MicrobatchSource.Reader microbatchReader;
                    UnboundedSource.CheckpointMark checkpointMark;
                    MicrobatchSource microbatchSource = (MicrobatchSource)source;
                    Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Coder checkpointCoder = microbatchSource.getCheckpointMarkCoder();
                    if (state.exists()) {
                        lowWatermark = (Instant)((Tuple2)state.get())._2();
                        checkpointMark = (UnboundedSource.CheckpointMark)CoderHelpers.fromByteArray((byte[])((Tuple2)state.get())._1(), checkpointCoder);
                        LOG.info("Continue reading from an existing CheckpointMark.");
                    } else if (startCheckpointMark.isDefined() && !((UnboundedSource.CheckpointMark)startCheckpointMark.get()).equals(EmptyCheckpointMark.get())) {
                        checkpointMark = (UnboundedSource.CheckpointMark)startCheckpointMark.get();
                        LOG.info("Start reading from a provided CheckpointMark.");
                    } else {
                        checkpointMark = null;
                        LOG.info("No CheckpointMark provided, start reading from default.");
                    }
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    long readDurationMillis = 0L;
                    try {
                        microbatchReader = (MicrobatchSource.Reader)microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(), checkpointMark);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    ArrayList<byte[]> readValues = new ArrayList<byte[]>();
                    WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of((Coder)source.getDefaultOutputCoder(), (Coder)GlobalWindow.Coder.INSTANCE);
                    try {
                        boolean finished;
                        boolean bl = finished = !microbatchReader.start();
                        while (!finished) {
                            WindowedValue wv = WindowedValue.of(microbatchReader.getCurrent(), (Instant)microbatchReader.getCurrentTimestamp(), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
                            readValues.add(CoderHelpers.toByteArray(wv, coder));
                            finished = !microbatchReader.advance();
                        }
                        Instant sourceWatermark = microbatchReader.getWatermark();
                        highWatermark = sourceWatermark.isAfter((ReadableInstant)lowWatermark) ? sourceWatermark : lowWatermark;
                        readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                        LOG.info("Source id {} spent {} millis on reading.", (Object)microbatchSource.getId(), (Object)readDurationMillis);
                        Object finishedReadCheckpointMark = microbatchReader.getCheckpointMark();
                        byte[] codedCheckpoint = new byte[]{};
                        if (finishedReadCheckpointMark != null) {
                            codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
                        } else {
                            LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
                        }
                        state.update((Object)new Tuple2((Object)codedCheckpoint, (Object)highWatermark));
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Failed to read from reader.", e);
                    }
                    ArrayList payload = Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator()));
                    Tuple2 tuple2 = new Tuple2(payload, (Object)new SparkUnboundedSource.Metadata(readValues.size(), lowWatermark, highWatermark, readDurationMillis, metricsContainers));
                    return tuple2;
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static abstract class SerializableFunction3<T1, T2, T3, T4>
    extends AbstractFunction3<T1, T2, T3, T4>
    implements Serializable {
        private SerializableFunction3() {
        }
    }
}

