/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Lists;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.translation.SparkAbstractCombineFn;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT>
extends SparkAbstractCombineFn {
    private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;

    public SparkKeyedCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, SerializablePipelineOptions options, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) {
        super(options, sideInputs, windowingStrategy);
        this.combineFn = combineFn;
    }

    public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
        return (OutputT)this.combineFn.apply((Iterable)((KV)windowedKv.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(windowedKv));
    }

    Iterable<WindowedValue<KV<K, AccumT>>> createCombiner(WindowedValue<KV<K, InputT>> wkvi) {
        boolean merging;
        Iterable sortedInputs = SparkKeyedCombineFn.sortByWindows(wkvi.explodeWindows());
        TimestampCombiner timestampCombiner = this.windowingStrategy.getTimestampCombiner();
        WindowFn windowFn = this.windowingStrategy.getWindowFn();
        Iterator iterator = sortedInputs.iterator();
        WindowedValue currentInput = iterator.next();
        BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
        Object key = ((KV)currentInput.getValue()).getKey();
        Object accumulator = this.combineFn.createAccumulator((CombineWithContext.Context)this.ctxtForInput(currentInput));
        accumulator = this.combineFn.addInput(accumulator, ((KV)currentInput.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(currentInput));
        Instant windowTimestamp = timestampCombiner.assign(currentWindow, this.windowingStrategy.getWindowFn().getOutputTime(currentInput.getTimestamp(), currentWindow));
        ArrayList<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement(nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkKeyedCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkKeyedCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                accumulator = this.combineFn.addInput(accumulator, ((KV)nextValue.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
                windowTimestamp = timestampCombiner.combine(new Instant[]{windowTimestamp, timestampCombiner.assign(currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow))});
                continue;
            }
            output.add(WindowedValue.of((Object)KV.of((Object)key, (Object)accumulator), (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
            accumulator = this.combineFn.createAccumulator((CombineWithContext.Context)this.ctxtForInput(nextValue));
            accumulator = this.combineFn.addInput(accumulator, ((KV)nextValue.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
            currentWindow = nextWindow;
            windowTimestamp = timestampCombiner.assign(currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow));
        }
        output.add(WindowedValue.of((Object)KV.of((Object)key, (Object)accumulator), (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
        return output;
    }

    Iterable<WindowedValue<KV<K, AccumT>>> mergeValue(WindowedValue<KV<K, InputT>> wkvi, Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
        return this.mergeCombiners(this.createCombiner(wkvi), wkvas);
    }

    Iterable<WindowedValue<KV<K, AccumT>>> mergeCombiners(Iterable<WindowedValue<KV<K, AccumT>>> a1, Iterable<WindowedValue<KV<K, AccumT>>> a2) {
        boolean merging;
        Iterable accumulators = Iterables.concat(a1, a2);
        Iterable sortedAccumulators = SparkKeyedCombineFn.sortByWindows(accumulators);
        TimestampCombiner timestampCombiner = this.windowingStrategy.getTimestampCombiner();
        Iterator iterator = sortedAccumulators.iterator();
        WindowedValue currentValue = iterator.next();
        Object key = ((KV)currentValue.getValue()).getKey();
        BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
        ArrayList<Object> currentWindowAccumulators = Lists.newArrayList();
        currentWindowAccumulators.add(((KV)currentValue.getValue()).getValue());
        ArrayList<Instant> windowTimestamps = Lists.newArrayList();
        windowTimestamps.add(currentValue.getTimestamp());
        ArrayList<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement(nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkKeyedCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkKeyedCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                currentWindowAccumulators.add(((KV)nextValue.getValue()).getValue());
                windowTimestamps.add(nextValue.getTimestamp());
                continue;
            }
            Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
            Iterable accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
            WindowedValue preMergeWindowedValue = WindowedValue.of((Object)KV.of((Object)key, accumsToMerge), (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
            Object accumulated = this.combineFn.mergeAccumulators(accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
            WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue((Object)KV.of((Object)key, (Object)accumulated));
            output.add(postMergeWindowedValue);
            currentWindowAccumulators.clear();
            currentWindowAccumulators.add(((KV)nextValue.getValue()).getValue());
            currentWindow = nextWindow;
            windowTimestamps.clear();
            windowTimestamps.add(nextValue.getTimestamp());
        }
        Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps);
        Iterable accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators);
        WindowedValue preMergeWindowedValue = WindowedValue.of((Object)KV.of((Object)key, accumsToMerge), (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
        Object accumulated = this.combineFn.mergeAccumulators(accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
        WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue((Object)KV.of((Object)key, (Object)accumulated));
        output.add(postMergeWindowedValue);
        return output;
    }

    Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
        return StreamSupport.stream(wkvas.spliterator(), false).map(wkva -> {
            if (wkva == null) {
                return null;
            }
            Object accumulator = ((KV)wkva.getValue()).getValue();
            return wkva.withValue(this.combineFn.extractOutput(accumulator, (CombineWithContext.Context)this.ctxtForInput((WindowedValue<?>)wkva)));
        }).collect(Collectors.toList());
    }
}

