/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.translation.SparkAbstractCombineFn;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;

public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT>
extends SparkAbstractCombineFn {
    private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;

    public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) {
        super(runtimeContext, sideInputs, windowingStrategy);
        this.combineFn = combineFn;
    }

    public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
        return (OutputT)this.combineFn.apply(((KV)windowedKv.getValue()).getKey(), (Iterable)((KV)windowedKv.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(windowedKv));
    }

    Iterable<WindowedValue<KV<K, AccumT>>> createCombiner(WindowedValue<KV<K, InputT>> wkvi) {
        boolean merging;
        Iterable sortedInputs = SparkKeyedCombineFn.sortByWindows(wkvi.explodeWindows());
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        Iterator iterator = sortedInputs.iterator();
        WindowedValue currentInput = iterator.next();
        BoundedWindow currentWindow = (BoundedWindow)Iterables.getFirst((Iterable)currentInput.getWindows(), null);
        Object key = ((KV)currentInput.getValue()).getKey();
        Object accumulator = this.combineFn.createAccumulator(key, (CombineWithContext.Context)this.ctxtForInput(currentInput));
        accumulator = this.combineFn.addInput(key, accumulator, ((KV)currentInput.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(currentInput));
        Instant windowTimestamp = outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow);
        ArrayList output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement((Iterable)nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkKeyedCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkKeyedCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                accumulator = this.combineFn.addInput(key, accumulator, ((KV)nextValue.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
                windowTimestamp = outputTimeFn.combine(windowTimestamp, outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
                continue;
            }
            output.add(WindowedValue.of((Object)KV.of((Object)key, (Object)accumulator), (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
            accumulator = this.combineFn.createAccumulator(key, (CombineWithContext.Context)this.ctxtForInput(nextValue));
            accumulator = this.combineFn.addInput(key, accumulator, ((KV)nextValue.getValue()).getValue(), (CombineWithContext.Context)this.ctxtForInput(nextValue));
            currentWindow = nextWindow;
            windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
        }
        output.add(WindowedValue.of((Object)KV.of((Object)key, (Object)accumulator), (Instant)windowTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING));
        return output;
    }

    Iterable<WindowedValue<KV<K, AccumT>>> mergeValue(WindowedValue<KV<K, InputT>> wkvi, Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
        return this.mergeCombiners(this.createCombiner(wkvi), wkvas);
    }

    Iterable<WindowedValue<KV<K, AccumT>>> mergeCombiners(Iterable<WindowedValue<KV<K, AccumT>>> a1, Iterable<WindowedValue<KV<K, AccumT>>> a2) {
        boolean merging;
        Iterable accumulators = Iterables.concat(a1, a2);
        Iterable sortedAccumulators = SparkKeyedCombineFn.sortByWindows(accumulators);
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        Iterator iterator = sortedAccumulators.iterator();
        WindowedValue currentValue = iterator.next();
        Object key = ((KV)currentValue.getValue()).getKey();
        BoundedWindow currentWindow = (BoundedWindow)Iterables.getFirst((Iterable)currentValue.getWindows(), null);
        ArrayList currentWindowAccumulators = Lists.newArrayList();
        currentWindowAccumulators.add(((KV)currentValue.getValue()).getValue());
        ArrayList windowTimestamps = Lists.newArrayList();
        windowTimestamps.add(currentValue.getTimestamp());
        ArrayList output = Lists.newArrayList();
        boolean bl = merging = !this.windowingStrategy.getWindowFn().isNonMerging();
        while (iterator.hasNext()) {
            boolean mergingAndIntersecting;
            WindowedValue nextValue = iterator.next();
            BoundedWindow nextWindow = (BoundedWindow)Iterables.getOnlyElement((Iterable)nextValue.getWindows());
            boolean bl2 = mergingAndIntersecting = merging && SparkKeyedCombineFn.isIntersecting((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
            if (mergingAndIntersecting || nextWindow.equals(currentWindow)) {
                if (mergingAndIntersecting) {
                    currentWindow = SparkKeyedCombineFn.merge((IntervalWindow)currentWindow, (IntervalWindow)nextWindow);
                }
                currentWindowAccumulators.add(((KV)nextValue.getValue()).getValue());
                windowTimestamps.add(nextValue.getTimestamp());
                continue;
            }
            Instant mergedTimestamp = outputTimeFn.merge(currentWindow, (Iterable)windowTimestamps);
            Iterable accumsToMerge = Iterables.unmodifiableIterable((Iterable)currentWindowAccumulators);
            WindowedValue preMergeWindowedValue = WindowedValue.of((Object)KV.of((Object)key, (Object)accumsToMerge), (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
            Object accumulated = this.combineFn.mergeAccumulators(key, accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
            WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue((Object)KV.of((Object)key, (Object)accumulated));
            output.add(postMergeWindowedValue);
            currentWindowAccumulators.clear();
            currentWindowAccumulators.add(((KV)nextValue.getValue()).getValue());
            currentWindow = nextWindow;
            windowTimestamps.clear();
            windowTimestamps.add(nextValue.getTimestamp());
        }
        Instant mergedTimestamp = outputTimeFn.merge(currentWindow, (Iterable)windowTimestamps);
        Iterable accumsToMerge = Iterables.unmodifiableIterable((Iterable)currentWindowAccumulators);
        WindowedValue preMergeWindowedValue = WindowedValue.of((Object)KV.of((Object)key, (Object)accumsToMerge), (Instant)mergedTimestamp, (BoundedWindow)currentWindow, (PaneInfo)PaneInfo.NO_FIRING);
        Object accumulated = this.combineFn.mergeAccumulators(key, accumsToMerge, (CombineWithContext.Context)this.ctxtForInput(preMergeWindowedValue));
        WindowedValue postMergeWindowedValue = preMergeWindowedValue.withValue((Object)KV.of((Object)key, (Object)accumulated));
        output.add(postMergeWindowedValue);
        return output;
    }

    Iterable<WindowedValue<OutputT>> extractOutput(Iterable<WindowedValue<KV<K, AccumT>>> wkvas) {
        return Iterables.transform(wkvas, (Function)new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>(){

            @Nullable
            public WindowedValue<OutputT> apply(@Nullable WindowedValue<KV<K, AccumT>> wkva) {
                if (wkva == null) {
                    return null;
                }
                Object key = ((KV)wkva.getValue()).getKey();
                Object accumulator = ((KV)wkva.getValue()).getValue();
                return wkva.withValue(SparkKeyedCombineFn.this.combineFn.extractOutput(key, accumulator, (CombineWithContext.Context)SparkKeyedCombineFn.this.ctxtForInput(wkva)));
            }
        });
    }
}

