/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.ReifyTimestampsAndWindowsFunction;
import org.apache.beam.runners.spark.translation.SparkGlobalCombineFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class GroupCombineFunctions {
    public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupByKeyOnly(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, WindowedValue.WindowedValueCoder<V> wvCoder, @Nullable Partitioner partitioner) {
        JavaPairRDD pairRDD = rdd.map(new ReifyTimestampsAndWindowsFunction()).map(WindowedValue::getValue).mapToPair(TranslationUtils.toPairFunction()).mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
        JavaPairRDD groupedRDD = partitioner != null ? pairRDD.groupByKey(partitioner) : pairRDD.groupByKey();
        return groupedRDD.mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder)), true).mapPartitions(TranslationUtils.fromPairFlatMapFunction(), true).mapPartitions(TranslationUtils.functionToFlatMapFunction(WindowedValue::valueInGlobalWindow), true);
    }

    public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally(JavaRDD<WindowedValue<InputT>> rdd, SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn, Coder<AccumT> aCoder, WindowingStrategy<?, ?> windowingStrategy) {
        WindowedValue.FullWindowedValueCoder wvaCoder = WindowedValue.FullWindowedValueCoder.of(aCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        IterableCoder iterAccumCoder = IterableCoder.of((Coder)wvaCoder);
        SerializableAccumulator accumulatedResult = (SerializableAccumulator)rdd.aggregate(SerializableAccumulator.empty(iterAccumCoder), (Function2 & Serializable)(ab, ib) -> {
            Iterable merged = sparkCombineFn.seqOp(ab.getOrDecode(iterAccumCoder), (WindowedValue)ib);
            return SerializableAccumulator.of(merged, iterAccumCoder);
        }, (Function2 & Serializable)(a1b, a2b) -> {
            Iterable merged = sparkCombineFn.combOp(a1b.getOrDecode(iterAccumCoder), a2b.getOrDecode(iterAccumCoder));
            return SerializableAccumulator.of(merged, iterAccumCoder);
        });
        Iterable result = accumulatedResult.getOrDecode(iterAccumCoder);
        return Iterables.isEmpty(result) ? Optional.absent() : Optional.of(result);
    }

    public static <K, InputT, AccumT> JavaPairRDD<K, Iterable<WindowedValue<KV<K, AccumT>>>> combinePerKey(JavaRDD<WindowedValue<KV<K, InputT>>> rdd, SparkKeyedCombineFn<K, InputT, AccumT, ?> sparkCombineFn, Coder<K> keyCoder, Coder<AccumT> aCoder, WindowingStrategy<?, ?> windowingStrategy) {
        WindowedValue.FullWindowedValueCoder wkvaCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of(keyCoder, aCoder), (Coder)windowingStrategy.getWindowFn().windowCoder());
        IterableCoder iterAccumCoder = IterableCoder.of((Coder)wkvaCoder);
        JavaPairRDD inRddDuplicatedKeyPair = rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue());
        JavaPairRDD accumulatedResult = inRddDuplicatedKeyPair.combineByKey((Function & Serializable)input -> SerializableAccumulator.of(sparkCombineFn.createCombiner((WindowedValue)input), iterAccumCoder), (Function2 & Serializable)(acc, input) -> SerializableAccumulator.of(sparkCombineFn.mergeValue((WindowedValue)input, acc.getOrDecode(iterAccumCoder)), iterAccumCoder), (Function2 & Serializable)(acc1, acc2) -> SerializableAccumulator.of(sparkCombineFn.mergeCombiners(acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)), iterAccumCoder));
        return accumulatedResult.mapToPair((PairFunction & Serializable)i -> new Tuple2(i._1, ((SerializableAccumulator)i._2).getOrDecode(iterAccumCoder)));
    }

    public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, WindowedValue.WindowedValueCoder<V> wvCoder) {
        return rdd.map(new ReifyTimestampsAndWindowsFunction()).map(WindowedValue::getValue).mapToPair(TranslationUtils.toPairFunction()).mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)).repartition(rdd.getNumPartitions()).mapToPair(CoderHelpers.fromByteFunction(keyCoder, wvCoder)).map(TranslationUtils.fromPairFunction()).map(TranslationUtils.toKVByWindowInValue());
    }

    public static class KryoAccumulatorSerializer<AccumT>
    extends Serializer<SerializableAccumulator<AccumT>> {
        public void write(Kryo kryo, Output output, SerializableAccumulator<AccumT> accumulator) {
            byte[] coded = accumulator.toBytes();
            output.writeInt(coded.length, true);
            output.write(coded);
        }

        public SerializableAccumulator<AccumT> read(Kryo kryo, Input input, Class<SerializableAccumulator<AccumT>> type) {
            int length = input.readInt(true);
            byte[] coded = input.readBytes(length);
            return SerializableAccumulator.ofBytes(coded);
        }
    }

    public static class SerializableAccumulator<AccumT>
    implements Serializable {
        private transient Iterable<WindowedValue<AccumT>> accumulated;
        private transient Coder<Iterable<WindowedValue<AccumT>>> coder;
        private byte[] serializedAcc;

        private SerializableAccumulator() {
        }

        private SerializableAccumulator(Iterable<WindowedValue<AccumT>> accumulated, Coder<Iterable<WindowedValue<AccumT>>> coder, byte[] serializedAcc) {
            this.accumulated = accumulated;
            this.coder = coder;
            this.serializedAcc = serializedAcc;
        }

        static <AccumT> SerializableAccumulator<AccumT> of(Iterable<WindowedValue<AccumT>> accumulated, Coder<Iterable<WindowedValue<AccumT>>> coder) {
            return new SerializableAccumulator<AccumT>(accumulated, coder, null);
        }

        static <AccumT> SerializableAccumulator<AccumT> ofBytes(byte[] serializedAcc) {
            Preconditions.checkNotNull((Object)serializedAcc);
            return new SerializableAccumulator<AccumT>(null, null, serializedAcc);
        }

        static <AccumT> SerializableAccumulator<AccumT> empty(Coder<Iterable<WindowedValue<AccumT>>> coder) {
            return new SerializableAccumulator<AccumT>(Lists.newArrayList(), coder, null);
        }

        Iterable<WindowedValue<AccumT>> getOrDecode(Coder<Iterable<WindowedValue<AccumT>>> coder) {
            if (this.accumulated == null) {
                this.accumulated = CoderHelpers.fromByteArray(this.serializedAcc, coder);
                this.serializedAcc = null;
            }
            if (this.coder == null) {
                this.coder = coder;
            }
            return this.accumulated;
        }

        byte[] toBytes() {
            byte[] coded;
            if (this.coder != null) {
                coded = CoderHelpers.toByteArray(this.accumulated, this.coder);
            } else if (this.serializedAcc != null) {
                coded = this.serializedAcc;
            } else {
                throw new IllegalStateException(String.format("Given '%s' cannot be serialized since it do not contain coder or already serialized data.", SerializableAccumulator.class.getSimpleName()));
            }
            return coded;
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            byte[] coded = this.toBytes();
            out.writeInt(coded.length);
            out.write(coded);
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            int length = in.readInt();
            byte[] coded = new byte[length];
            in.readFully(coded);
            this.serializedAcc = coded;
        }
    }
}

