/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Objects;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.PeekingIterator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class GroupNonMergingWindowsFunctions {
    private static final Logger LOG = LoggerFactory.getLogger(GroupNonMergingWindowsFunctions.class);

    static boolean isEligibleForGroupByWindow(WindowingStrategy<?, ?> windowingStrategy) {
        return !windowingStrategy.needsMerge() && windowingStrategy.getTimestampCombiner() == TimestampCombiner.END_OF_WINDOW && windowingStrategy.getWindowFn().windowCoder().consistentWithEquals();
    }

    static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyAndWindow(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, Coder<V> valueCoder, WindowingStrategy<?, W> windowingStrategy, Partitioner partitioner) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        WindowedValue.FullWindowedValueCoder windowedKvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of(keyCoder, valueCoder), (Coder)windowCoder);
        JavaPairRDD windowInKey = GroupNonMergingWindowsFunctions.bringWindowToKey(rdd, keyCoder, windowCoder, (SerializableFunction & Serializable)wv -> CoderHelpers.toByteArray(wv, windowedKvCoder));
        return windowInKey.repartitionAndSortWithinPartitions(GroupNonMergingWindowsFunctions.getPartitioner(partitioner, rdd)).mapPartitions((FlatMapFunction & Serializable)it -> new GroupByKeyIterator((Iterator<Tuple2<ByteArray, byte[]>>)it, keyCoder, windowingStrategy, windowedKvCoder)).filter(Objects::nonNull);
    }

    static <K, V, W extends BoundedWindow> JavaPairRDD<ByteArray, WindowedValue<KV<K, V>>> bringWindowToKey(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, Coder<W> windowCoder) {
        return GroupNonMergingWindowsFunctions.bringWindowToKey(rdd, keyCoder, windowCoder, (SerializableFunction & Serializable)e -> e);
    }

    static <K, V, OutputT, W extends BoundedWindow> JavaPairRDD<ByteArray, OutputT> bringWindowToKey(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, Coder<W> windowCoder, SerializableFunction<WindowedValue<KV<K, V>>, OutputT> mappingFn) {
        if (!GroupNonMergingWindowsFunctions.isKeyAndWindowCoderConsistentWithEquals(keyCoder, windowCoder)) {
            LOG.warn("Either coder {} or {} is not consistent with equals. That might cause issues on some runners.", keyCoder, windowCoder);
        }
        return rdd.flatMapToPair((PairFlatMapFunction & Serializable)windowedValue -> {
            byte[] keyBytes = CoderHelpers.toByteArray(((KV)windowedValue.getValue()).getKey(), keyCoder);
            return Iterators.transform(windowedValue.explodeWindows().iterator(), item -> {
                Objects.requireNonNull(item, "Exploded window can not be null.");
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)item.getWindows());
                byte[] windowBytes = CoderHelpers.toByteArray(window, windowCoder);
                WindowedValue valueOut = WindowedValue.of((Object)((KV)item.getValue()), (Instant)item.getTimestamp(), (BoundedWindow)window, (PaneInfo)item.getPane());
                ByteArray windowedKey = new ByteArray(Bytes.concat((byte[][])new byte[][]{keyBytes, windowBytes}));
                return new Tuple2((Object)windowedKey, mappingFn.apply((Object)valueOut));
            });
        });
    }

    private static boolean isKeyAndWindowCoderConsistentWithEquals(Coder<?> keyCoder, Coder<?> windowCoder) {
        try {
            keyCoder.verifyDeterministic();
            windowCoder.verifyDeterministic();
            return keyCoder.consistentWithEquals() && windowCoder.consistentWithEquals();
        }
        catch (Coder.NonDeterministicException ex) {
            throw new IllegalArgumentException("Coder for both key " + keyCoder + " and " + windowCoder + " must be deterministic", ex);
        }
    }

    private static <K, V> Partitioner getPartitioner(Partitioner partitioner, JavaRDD<WindowedValue<KV<K, V>>> rdd) {
        return partitioner == null ? new HashPartitioner(rdd.getNumPartitions()) : partitioner;
    }

    static class GroupByKeyIterator<K, V, W extends BoundedWindow>
    implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
        private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
        private final Coder<K> keyCoder;
        private final WindowingStrategy<?, W> windowingStrategy;
        private final WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder;
        private boolean hasNext = true;
        private ByteArray currentKey = null;

        GroupByKeyIterator(Iterator<Tuple2<ByteArray, byte[]>> inner, Coder<K> keyCoder, WindowingStrategy<?, W> windowingStrategy, WindowedValue.FullWindowedValueCoder<KV<K, V>> windowedValueCoder) throws Coder.NonDeterministicException {
            this.inner = Iterators.peekingIterator(inner);
            this.keyCoder = keyCoder;
            this.windowingStrategy = windowingStrategy;
            this.windowedValueCoder = windowedValueCoder;
        }

        @Override
        public boolean hasNext() {
            return this.hasNext;
        }

        @Override
        public WindowedValue<KV<K, Iterable<V>>> next() {
            while (this.inner.hasNext()) {
                ByteArray nextKey = (ByteArray)((Tuple2)this.inner.peek())._1;
                if (nextKey.equals(this.currentKey)) {
                    this.inner.next();
                    continue;
                }
                this.currentKey = nextKey;
                WindowedValue<KV<K, V>> decodedItem = this.decodeItem((Tuple2<ByteArray, byte[]>)((Tuple2)this.inner.peek()));
                return decodedItem.withValue((Object)KV.of((Object)((KV)decodedItem.getValue()).getKey(), (Object)new ValueIterator(this.inner, this.currentKey)));
            }
            this.hasNext = false;
            return null;
        }

        private V decodeValue(byte[] windowedValueBytes) {
            WindowedValue windowedValue = (WindowedValue)CoderHelpers.fromByteArray(windowedValueBytes, this.windowedValueCoder);
            return (V)((KV)windowedValue.getValue()).getValue();
        }

        private WindowedValue<KV<K, V>> decodeItem(Tuple2<ByteArray, byte[]> item) {
            K key = CoderHelpers.fromByteArray(((ByteArray)item._1).getValue(), this.keyCoder);
            WindowedValue windowedValue = (WindowedValue)CoderHelpers.fromByteArray((byte[])item._2, this.windowedValueCoder);
            Object value = ((KV)windowedValue.getValue()).getValue();
            BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)windowedValue.getWindows());
            Instant timestamp = this.windowingStrategy.getTimestampCombiner().assign(window, windowedValue.getTimestamp());
            return WindowedValue.of((Object)KV.of(key, (Object)value), (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING);
        }

        class ValueIterator
        implements Iterable<V> {
            boolean consumed = false;
            private final PeekingIterator<Tuple2<ByteArray, byte[]>> inner;
            private final ByteArray currentKey;

            ValueIterator(PeekingIterator<Tuple2<ByteArray, byte[]>> inner, ByteArray currentKey) {
                this.inner = inner;
                this.currentKey = currentKey;
            }

            @Override
            public Iterator<V> iterator() {
                if (this.consumed) {
                    throw new IllegalStateException("ValueIterator can't be iterated more than once,otherwise there could be data lost");
                }
                this.consumed = true;
                return new AbstractIterator<V>(){

                    protected V computeNext() {
                        if (ValueIterator.this.inner.hasNext() && ValueIterator.this.currentKey.equals(((Tuple2)((ValueIterator)ValueIterator.this).inner.peek())._1)) {
                            return GroupByKeyIterator.this.decodeValue((byte[])((Tuple2)((ValueIterator)ValueIterator.this).inner.next())._2);
                        }
                        return this.endOfData();
                    }
                };
            }
        }
    }
}

