/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.PeekingIterator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.UnsignedBytes;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.joda.time.Instant;
import scala.Tuple2;

public class GroupNonMergingWindowsFunctions {
    static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyAndWindow(JavaRDD<WindowedValue<KV<K, V>>> rdd, Coder<K> keyCoder, Coder<V> valueCoder, WindowingStrategy<?, W> windowingStrategy, Partitioner partitioner) {
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.getFullCoder((Coder)ByteArrayCoder.of(), (Coder)windowCoder);
        return rdd.flatMapToPair((PairFlatMapFunction & Serializable)windowedValue -> {
            byte[] keyBytes = CoderHelpers.toByteArray(((KV)windowedValue.getValue()).getKey(), keyCoder);
            byte[] valueBytes = CoderHelpers.toByteArray(((KV)windowedValue.getValue()).getValue(), valueCoder);
            return Iterators.transform(windowedValue.explodeWindows().iterator(), item -> {
                Objects.requireNonNull(item, "Exploded window can not be null.");
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)item.getWindows());
                byte[] windowBytes = CoderHelpers.toByteArray(window, windowCoder);
                byte[] windowValueBytes = CoderHelpers.toByteArray(WindowedValue.of((Object)valueBytes, (Instant)item.getTimestamp(), (BoundedWindow)window, (PaneInfo)item.getPane()), windowedValueCoder);
                WindowedKey windowedKey = new WindowedKey(keyBytes, windowBytes);
                return new Tuple2((Object)windowedKey, (Object)windowValueBytes);
            });
        }).repartitionAndSortWithinPartitions(GroupNonMergingWindowsFunctions.getPartitioner(partitioner, rdd)).mapPartitions((FlatMapFunction & Serializable)it -> new GroupByKeyIterator((Iterator<Tuple2<WindowedKey, byte[]>>)it, keyCoder, valueCoder, windowingStrategy, (WindowedValue.FullWindowedValueCoder<byte[]>)windowedValueCoder)).filter(Objects::nonNull);
    }

    private static <K, V> Partitioner getPartitioner(Partitioner partitioner, JavaRDD<WindowedValue<KV<K, V>>> rdd) {
        return partitioner == null ? new HashPartitioner(rdd.getNumPartitions()) : partitioner;
    }

    public static class WindowedKey
    implements Comparable<WindowedKey>,
    Serializable {
        private final byte[] key;
        private final byte[] window;

        WindowedKey(byte[] key, byte[] window) {
            this.key = key;
            this.window = window;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            WindowedKey that = (WindowedKey)o;
            return Arrays.equals(this.key, that.key) && Arrays.equals(this.window, that.window);
        }

        public int hashCode() {
            int result = Arrays.hashCode(this.key);
            result = 31 * result + Arrays.hashCode(this.window);
            return result;
        }

        byte[] getKey() {
            return this.key;
        }

        byte[] getWindow() {
            return this.window;
        }

        @Override
        public int compareTo(WindowedKey o) {
            int keyCompare = UnsignedBytes.lexicographicalComparator().compare(this.getKey(), o.getKey());
            if (keyCompare == 0) {
                return UnsignedBytes.lexicographicalComparator().compare(this.getWindow(), o.getWindow());
            }
            return keyCompare;
        }
    }

    static class GroupByKeyIterator<K, V, W extends BoundedWindow>
    implements Iterator<WindowedValue<KV<K, Iterable<V>>>> {
        private final PeekingIterator<Tuple2<WindowedKey, byte[]>> inner;
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final WindowingStrategy<?, W> windowingStrategy;
        private final WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder;
        private boolean hasNext = true;
        private WindowedKey currentKey = null;

        GroupByKeyIterator(Iterator<Tuple2<WindowedKey, byte[]>> inner, Coder<K> keyCoder, Coder<V> valueCoder, WindowingStrategy<?, W> windowingStrategy, WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder) {
            this.inner = Iterators.peekingIterator(inner);
            this.keyCoder = keyCoder;
            this.valueCoder = valueCoder;
            this.windowingStrategy = windowingStrategy;
            this.windowedValueCoder = windowedValueCoder;
        }

        @Override
        public boolean hasNext() {
            return this.hasNext;
        }

        @Override
        public WindowedValue<KV<K, Iterable<V>>> next() {
            while (this.inner.hasNext()) {
                WindowedKey nextKey = (WindowedKey)((Tuple2)this.inner.peek())._1;
                if (nextKey.equals(this.currentKey)) {
                    this.inner.next();
                    continue;
                }
                this.currentKey = nextKey;
                WindowedValue<KV<K, V>> decodedItem = this.decodeItem((Tuple2<WindowedKey, byte[]>)((Tuple2)this.inner.peek()));
                return decodedItem.withValue((Object)KV.of((Object)((KV)decodedItem.getValue()).getKey(), (Object)new ValueIterator(this.inner, this.currentKey)));
            }
            this.hasNext = false;
            return null;
        }

        private V decodeValue(byte[] windowedValueBytes) {
            WindowedValue windowedValue = (WindowedValue)CoderHelpers.fromByteArray(windowedValueBytes, this.windowedValueCoder);
            return CoderHelpers.fromByteArray((byte[])windowedValue.getValue(), this.valueCoder);
        }

        private WindowedValue<KV<K, V>> decodeItem(Tuple2<WindowedKey, byte[]> item) {
            K key = CoderHelpers.fromByteArray(((WindowedKey)item._1).getKey(), this.keyCoder);
            WindowedValue windowedValue = (WindowedValue)CoderHelpers.fromByteArray((byte[])item._2, this.windowedValueCoder);
            V value = CoderHelpers.fromByteArray((byte[])windowedValue.getValue(), this.valueCoder);
            BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)windowedValue.getWindows());
            Instant timestamp = this.windowingStrategy.getTimestampCombiner().assign(window, this.windowingStrategy.getWindowFn().getOutputTime(windowedValue.getTimestamp(), window));
            return WindowedValue.of((Object)KV.of(key, value), (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.ON_TIME_AND_ONLY_FIRING);
        }

        class ValueIterator
        implements Iterable<V> {
            boolean usedAsIterable = false;
            private final PeekingIterator<Tuple2<WindowedKey, byte[]>> inner;
            private final WindowedKey currentKey;

            ValueIterator(PeekingIterator<Tuple2<WindowedKey, byte[]>> inner, WindowedKey currentKey) {
                this.inner = inner;
                this.currentKey = currentKey;
            }

            @Override
            public Iterator<V> iterator() {
                if (this.usedAsIterable) {
                    throw new IllegalStateException("ValueIterator can't be iterated more than once,otherwise there could be data lost");
                }
                this.usedAsIterable = true;
                return new AbstractIterator<V>(){

                    protected V computeNext() {
                        if (ValueIterator.this.inner.hasNext() && ValueIterator.this.currentKey.equals(((Tuple2)((ValueIterator)ValueIterator.this).inner.peek())._1)) {
                            return GroupByKeyIterator.this.decodeValue((byte[])((Tuple2)((ValueIterator)ValueIterator.this).inner.next())._2);
                        }
                        return this.endOfData();
                    }
                };
            }
        }
    }
}

