/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.stateful.SparkStateInternals;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.PairDStreamFunctions;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;

public class SparkGroupAlsoByWindowViaWindowSet
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

    private static <K, InputT> WindowedValue.FullWindowedValueCoder<KV<K, Iterable<InputT>>> windowedValueKeyValueCoderOf(Coder<K> keyCoder, Coder<InputT> iCoder, Coder<? extends BoundedWindow> wCoder) {
        return WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of(keyCoder, (Coder)IterableCoder.of(iCoder)), wCoder);
    }

    private static <W extends BoundedWindow> TimerInternals.TimerDataCoder timerDataCoderOf(WindowingStrategy<?, W> windowingStrategy) {
        return TimerInternals.TimerDataCoder.of((Coder)windowingStrategy.getWindowFn().windowCoder());
    }

    private static void checkpointIfNeeded(DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream, SerializablePipelineOptions options) {
        Long checkpointDurationMillis = SparkGroupAlsoByWindowViaWindowSet.getBatchDuration(options);
        if (checkpointDurationMillis > 0L) {
            firedStream.checkpoint(new Duration(checkpointDurationMillis.longValue()));
        }
    }

    private static Long getBatchDuration(SerializablePipelineOptions options) {
        return ((SparkPipelineOptions)options.get().as(SparkPipelineOptions.class)).getCheckpointDurationMillis();
    }

    private static <K, InputT> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> stripStateValues(DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> firedStream, final Coder<K> keyCoder, final WindowedValue.FullWindowedValueCoder<InputT> wvCoder) {
        return JavaPairDStream.fromPairDStream(firedStream, (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag()).filter((Function & Serializable)t2 -> !((List)((Tuple2)t2._2())._2()).isEmpty()).flatMap(new FlatMapFunction<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, WindowedValue<KV<K, Iterable<InputT>>>>(){
            private final WindowedValue.FullWindowedValueCoder<KV<K, Iterable<InputT>>> windowedValueKeyValueCoder;
            {
                this.windowedValueKeyValueCoder = SparkGroupAlsoByWindowViaWindowSet.windowedValueKeyValueCoderOf(keyCoder, wvCoder.getValueCoder(), (Coder<? extends BoundedWindow>)wvCoder.getWindowCoder());
            }

            public java.util.Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> t2) throws Exception {
                return CoderHelpers.fromByteArrays((Collection)((Tuple2)t2._2())._2(), this.windowedValueKeyValueCoder).iterator();
            }
        });
    }

    private static <K, InputT> PairDStreamFunctions<ByteArray, byte[]> buildPairDStream(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, Coder<K> keyCoder, Coder<WindowedValue<InputT>> wvCoder) {
        DStream tupleDStream = inputDStream.transformToPair((Function2 & Serializable)(rdd, time) -> rdd.mapPartitions(TranslationUtils.functionToFlatMapFunction(WindowedValue::getValue), true).mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true).mapValues((Function & Serializable)values -> KV.of((Object)time.milliseconds(), (Object)values)).mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.toByteFunction(keyCoder, KvCoder.of((Coder)VarLongCoder.of(), (Coder)IterableCoder.of((Coder)wvCoder)))), true)).dstream();
        return DStream.toPairDStreamFunctions((DStream)tupleDStream, (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), null);
    }

    public static <K, InputT, W extends BoundedWindow> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, Coder<K> keyCoder, Coder<WindowedValue<InputT>> wvCoder, WindowingStrategy<?, W> windowingStrategy, SerializablePipelineOptions options, List<Integer> sourceIds, String transformFullName) {
        PairDStreamFunctions<ByteArray, byte[]> pairDStream = SparkGroupAlsoByWindowViaWindowSet.buildPairDStream(inputDStream, keyCoder, wvCoder);
        UpdateStateByKeyFunction updateFunc = new UpdateStateByKeyFunction(sourceIds, windowingStrategy, (WindowedValue.FullWindowedValueCoder)wvCoder, keyCoder, options, transformFullName);
        DStream firedStream = pairDStream.updateStateByKey(updateFunc, (Partitioner)pairDStream.defaultPartitioner(pairDStream.defaultPartitioner$default$1()), true, JavaSparkContext$.MODULE$.fakeClassTag());
        SparkGroupAlsoByWindowViaWindowSet.checkpointIfNeeded((DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>)firedStream, options);
        return SparkGroupAlsoByWindowViaWindowSet.stripStateValues((DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>)firedStream, keyCoder, (WindowedValue.FullWindowedValueCoder)wvCoder);
    }

    private static class UpdateStateByKeyFunction<K, InputT, W extends BoundedWindow>
    extends AbstractFunction1<Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>>, Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>>
    implements Serializable {
        private final WindowedValue.FullWindowedValueCoder<InputT> wvCoder;
        private final Coder<K> keyCoder;
        private final List<Integer> sourceIds;
        private final TimerInternals.TimerDataCoder timerDataCoder;
        private final WindowingStrategy<?, W> windowingStrategy;
        private final SerializablePipelineOptions options;
        private final IterableCoder<WindowedValue<InputT>> itrWvCoder;
        private final String logPrefix;
        private final Coder<WindowedValue<KV<K, Iterable<InputT>>>> wvKvIterCoder;

        UpdateStateByKeyFunction(List<Integer> sourceIds, WindowingStrategy<?, W> windowingStrategy, WindowedValue.FullWindowedValueCoder<InputT> wvCoder, Coder<K> keyCoder, SerializablePipelineOptions options, String logPrefix) {
            this.wvCoder = wvCoder;
            this.keyCoder = keyCoder;
            this.sourceIds = sourceIds;
            this.timerDataCoder = SparkGroupAlsoByWindowViaWindowSet.timerDataCoderOf(windowingStrategy);
            this.windowingStrategy = windowingStrategy;
            this.options = options;
            this.itrWvCoder = IterableCoder.of(wvCoder);
            this.logPrefix = logPrefix;
            this.wvKvIterCoder = SparkGroupAlsoByWindowViaWindowSet.windowedValueKeyValueCoderOf(keyCoder, wvCoder.getValueCoder(), (Coder<? extends BoundedWindow>)wvCoder.getWindowCoder());
        }

        public Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> apply(Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> input) {
            long closedWindowDropped;
            SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)this.wvCoder.getValueCoder());
            MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider");
            CounterCell droppedDueToClosedWindow = cellProvider.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, (String)"DroppedDueToClosedWindow"));
            CounterCell droppedDueToLateness = cellProvider.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, (String)"DroppedDueToLateness"));
            long lateDropped = droppedDueToLateness.getCumulative();
            if (lateDropped > 0L) {
                LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
                droppedDueToLateness.inc(-droppedDueToLateness.getCumulative().longValue());
            }
            if ((closedWindowDropped = droppedDueToClosedWindow.getCumulative().longValue()) > 0L) {
                LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
                droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative().longValue());
            }
            return JavaConversions.asScalaIterator((java.util.Iterator)((Object)new UpdateStateByKeyOutputIterator(input, reduceFn, droppedDueToLateness)));
        }

        private class UpdateStateByKeyOutputIterator
        extends AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> {
            private final Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> input;
            private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
            private final CounterCell droppedDueToLateness;

            private SparkStateInternals<K> processPreviousState(Option<Tuple2<StateAndTimers, List<byte[]>>> prevStateAndTimersOpt, K key, SparkTimerInternals timerInternals) {
                SparkStateInternals stateInternals;
                if (prevStateAndTimersOpt.isEmpty()) {
                    stateInternals = SparkStateInternals.forKey(key);
                } else {
                    StateAndTimers prevStateAndTimers = (StateAndTimers)((Tuple2)prevStateAndTimersOpt.get())._1();
                    stateInternals = SparkStateInternals.forKeyAndState(key, prevStateAndTimers.getState());
                    timerInternals.addTimers(SparkTimerInternals.deserializeTimers(prevStateAndTimers.getTimers(), UpdateStateByKeyFunction.this.timerDataCoder));
                }
                return stateInternals;
            }

            UpdateStateByKeyOutputIterator(Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> input, SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn, CounterCell droppedDueToLateness) {
                this.input = input;
                this.reduceFn = reduceFn;
                this.droppedDueToLateness = droppedDueToLateness;
            }

            private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(Collection<TimerInternals.TimerData> timers, Instant inputWatermark) {
                Predicate eligibleForProcessing = timer -> !timer.getDomain().equals((Object)TimeDomain.EVENT_TIME) || inputWatermark.isAfter((ReadableInstant)timer.getTimestamp());
                return FluentIterable.from(timers).filter(eligibleForProcessing).toSet();
            }

            protected Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> computeNext() {
                while (this.input.hasNext()) {
                    Tuple3 next = (Tuple3)this.input.next();
                    ByteArray encodedKey = (ByteArray)next._1();
                    Seq encodedKeyedElements = (Seq)next._2();
                    Option prevStateAndTimersOpt = (Option)next._3();
                    Object key = CoderHelpers.fromByteArray(encodedKey.getValue(), UpdateStateByKeyFunction.this.keyCoder);
                    Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks = GlobalWatermarkHolder.get(SparkGroupAlsoByWindowViaWindowSet.getBatchDuration(UpdateStateByKeyFunction.this.options));
                    SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(UpdateStateByKeyFunction.this.sourceIds, watermarks);
                    SparkStateInternals stateInternals = this.processPreviousState((Option<Tuple2<StateAndTimers, List<byte[]>>>)prevStateAndTimersOpt, key, timerInternals);
                    ExecutableTriggerStateMachine triggerStateMachine = ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((RunnerApi.Trigger)TriggerTranslation.toProto((Trigger)UpdateStateByKeyFunction.this.windowingStrategy.getTrigger())));
                    OutputWindowedValueHolder outputHolder = new OutputWindowedValueHolder();
                    ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, UpdateStateByKeyFunction.this.windowingStrategy, triggerStateMachine, stateInternals, (TimerInternals)timerInternals, outputHolder, (SideInputReader)new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, UpdateStateByKeyFunction.this.options.get());
                    if (!encodedKeyedElements.isEmpty()) {
                        try {
                            byte[] headBytes = (byte[])encodedKeyedElements.head();
                            KV keyedElements = (KV)CoderHelpers.fromByteArray(headBytes, KvCoder.of((Coder)VarLongCoder.of(), (Coder)UpdateStateByKeyFunction.this.itrWvCoder));
                            Long rddTimestamp = (Long)keyedElements.getKey();
                            LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": processing RDD with timestamp: {}, watermarks: {}", (Object)rddTimestamp, watermarks);
                            Iterable elements = (Iterable)keyedElements.getValue();
                            LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": input elements: {}", (Object)elements);
                            ArrayList nonExpiredElements = Lists.newArrayList((Iterable)LateDataUtils.dropExpiredWindows(key, (Iterable)elements, (TimerInternals)timerInternals, (WindowingStrategy)UpdateStateByKeyFunction.this.windowingStrategy, (CounterCell)this.droppedDueToLateness));
                            LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": non expired input elements: {}", (Object)nonExpiredElements);
                            reduceFnRunner.processElements((Iterable)nonExpiredElements);
                        }
                        catch (Exception e) {
                            throw new RuntimeException("Failed to process element with ReduceFnRunner", e);
                        }
                    } else if (stateInternals.getState().isEmpty()) continue;
                    try {
                        LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": timerInternals before advance are {}", (Object)timerInternals.toString());
                        timerInternals.advanceWatermark();
                        Collection<TimerInternals.TimerData> timersEligibleForProcessing = this.filterTimersEligibleForProcessing(timerInternals.getTimers(), timerInternals.currentInputWatermarkTime());
                        LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing);
                        reduceFnRunner.onTimers(timersEligibleForProcessing);
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e);
                    }
                    reduceFnRunner.persist();
                    List outputs = outputHolder.getWindowedValues();
                    if (outputs.isEmpty() && stateInternals.getState().isEmpty()) continue;
                    StateAndTimers updated = new StateAndTimers(stateInternals.getState(), SparkTimerInternals.serializeTimers(timerInternals.getTimers(), UpdateStateByKeyFunction.this.timerDataCoder));
                    LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": output elements are {}", (Object)Joiner.on((String)", ").join((Iterable)outputs));
                    List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, UpdateStateByKeyFunction.this.wvKvIterCoder);
                    return new Tuple2((Object)encodedKey, (Object)new Tuple2((Object)updated, serOutput));
                }
                return (Tuple2)this.endOfData();
            }
        }
    }

    private static class OutputWindowedValueHolder<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<WindowedValue<KV<K, Iterable<V>>>>();

        private OutputWindowedValueHolder() {
        }

        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.windowedValues.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        private List<WindowedValue<KV<K, Iterable<V>>>> getWindowedValues() {
            return this.windowedValues;
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException("Tagged outputs are not allowed in GroupAlsoByWindow.");
        }
    }

    public static class StateAndTimers
    implements Serializable {
        private final Table<String, String, byte[]> state;
        private final Collection<byte[]> serTimers;

        private StateAndTimers(Table<String, String, byte[]> state, Collection<byte[]> timers) {
            this.state = state;
            this.serTimers = timers;
        }

        Table<String, String, byte[]> getState() {
            return this.state;
        }

        Collection<byte[]> getTimers() {
            return this.serTimers;
        }
    }
}

