/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.QuickMath;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public class SessionWindowP<K, A, R, OUT>
extends AbstractProcessor {
    private static final Watermark COMPLETING_WM = new Watermark(Long.MAX_VALUE);
    final Map<K, Windows<A>> keyToWindows = new HashMap<K, Windows<A>>();
    final SortedMap<Long, Set<K>> deadlineToKeys = new TreeMap<Long, Set<K>>();
    long currentWatermark = Long.MIN_VALUE;
    private final long sessionTimeout;
    @Nonnull
    private final List<ToLongFunction<Object>> timestampFns;
    @Nonnull
    private final List<Function<Object, K>> keyFns;
    @Nonnull
    private final AggregateOperation<A, R> aggrOp;
    @Nonnull
    private final BiConsumer<? super A, ? super A> combineFn;
    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn;
    @Nonnull
    private final AbstractProcessor.FlatMapper<Watermark, OUT> closedWindowFlatmapper;
    private ProcessingGuarantee processingGuarantee;
    @Probe
    private AtomicLong lateEventsDropped = new AtomicLong();
    @Probe
    private AtomicLong totalKeys = new AtomicLong();
    @Probe
    private AtomicLong totalWindows = new AtomicLong();
    private Traverser snapshotTraverser;
    private long minRestoredCurrentWatermark = Long.MAX_VALUE;

    public SessionWindowP(long sessionTimeout, @Nonnull List<? extends ToLongFunction<?>> timestampFns, @Nonnull List<? extends Function<?, ? extends K>> keyFns, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn) {
        Preconditions.checkTrue((keyFns.size() == aggrOp.arity() ? 1 : 0) != 0, (String)(keyFns.size() + " key functions provided for " + aggrOp.arity() + "-arity aggregate operation"));
        this.timestampFns = timestampFns;
        this.keyFns = keyFns;
        this.aggrOp = aggrOp;
        this.combineFn = Objects.requireNonNull(aggrOp.combineFn());
        this.mapToOutputFn = mapToOutputFn;
        this.sessionTimeout = sessionTimeout;
        this.closedWindowFlatmapper = this.flatMapper(this::traverseClosedWindows);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.processingGuarantee = context.processingGuarantee();
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        long timestamp = this.timestampFns.get(ordinal).applyAsLong(item);
        if (timestamp < this.currentWatermark) {
            com.hazelcast.jet.impl.util.Util.logLateEvent(this.getLogger(), this.currentWatermark, item);
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.lateEventsDropped);
            return true;
        }
        K key = this.keyFns.get(ordinal).apply(item);
        this.addItem(ordinal, this.keyToWindows.computeIfAbsent(key, k -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalKeys);
            return new Windows();
        }), key, timestamp, item);
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark wm) {
        this.currentWatermark = wm.timestamp();
        assert (this.totalWindows.get() == (long)this.deadlineToKeys.values().stream().mapToInt(Set::size).sum()) : "unexpected totalWindows. Expected=" + this.deadlineToKeys.values().stream().mapToInt(Set::size).sum() + ", actual=" + this.totalWindows.get();
        return this.closedWindowFlatmapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.closedWindowFlatmapper.tryProcess(COMPLETING_WM);
    }

    private Traverser<OUT> traverseClosedWindows(Watermark wm) {
        SortedMap<Long, Set<K>> windowsToClose = this.deadlineToKeys.headMap(wm.timestamp());
        com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalWindows, -windowsToClose.values().stream().mapToInt(Set::size).sum());
        List distinctKeys = windowsToClose.values().stream().flatMap(Collection::stream).distinct().collect(Collectors.toList());
        windowsToClose.clear();
        Stream closedWindows = distinctKeys.stream().map(key -> this.closeWindows(this.keyToWindows.get(key), key, wm.timestamp())).flatMap(Collection::stream);
        return Traversers.traverseStream(closedWindows);
    }

    private void addToDeadlines(K key, long deadline) {
        if (this.deadlineToKeys.computeIfAbsent(deadline, x -> new HashSet()).add(key)) {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalWindows);
        }
    }

    private void removeFromDeadlines(K key, long deadline) {
        Set ks = (Set)this.deadlineToKeys.get(deadline);
        ks.remove(key);
        com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalWindows, -1L);
        if (ks.isEmpty()) {
            this.deadlineToKeys.remove(deadline);
        }
    }

    @Override
    public boolean saveToSnapshot() {
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToWindows.entrySet()).append(Util.entry(BroadcastKey.broadcastKey(Keys.CURRENT_WATERMARK), this.currentWatermark)).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            BroadcastKey bcastKey = (BroadcastKey)key;
            if (!Keys.CURRENT_WATERMARK.equals(bcastKey.key())) {
                throw new JetException("Unexpected broadcast key: " + bcastKey.key());
            }
            long newCurrentWatermark = (Long)value;
            assert (this.processingGuarantee != ProcessingGuarantee.EXACTLY_ONCE || this.minRestoredCurrentWatermark == Long.MAX_VALUE || this.minRestoredCurrentWatermark == newCurrentWatermark) : "different values for currentWatermark restored, before=" + this.minRestoredCurrentWatermark + ", new=" + newCurrentWatermark;
            this.minRestoredCurrentWatermark = Math.min(newCurrentWatermark, this.minRestoredCurrentWatermark);
            return;
        }
        if (this.keyToWindows.put(key, (Windows)value) != null) {
            throw new JetException("Duplicate key in snapshot: " + key);
        }
    }

    @Override
    public boolean finishSnapshotRestore() {
        assert (this.deadlineToKeys.isEmpty());
        for (Map.Entry<K, Windows<A>> entry : this.keyToWindows.entrySet()) {
            for (long end : ((Windows)entry.getValue()).ends) {
                this.addToDeadlines(entry.getKey(), end);
            }
        }
        this.currentWatermark = this.minRestoredCurrentWatermark;
        this.totalKeys.set(this.keyToWindows.size());
        LoggingUtil.logFine(this.getLogger(), "Restored currentWatermark from snapshot to: %s", this.currentWatermark);
        return true;
    }

    private void addItem(int ordinal, Windows<A> w, K key, long timestamp, Object item) {
        this.aggrOp.accumulateFn(ordinal).accept(this.resolveAcc(w, key, timestamp), item);
    }

    private List<OUT> closeWindows(Windows<A> w, K key, long wm) {
        int i;
        ArrayList<OUT> results = new ArrayList<OUT>();
        for (i = 0; i < ((Windows)w).size && ((Windows)w).ends[i] < wm; ++i) {
            OUT out = this.mapToOutputFn.apply(((Windows)w).starts[i], ((Windows)w).ends[i], key, this.aggrOp.finishFn().apply(((Windows)w).accs[i]));
            if (out == null) continue;
            results.add(out);
        }
        if (i != ((Windows)w).size) {
            ((Windows)w).removeHead(i);
        } else {
            this.keyToWindows.remove(key);
            this.totalKeys.set(this.keyToWindows.size());
        }
        return results;
    }

    private A resolveAcc(Windows<A> w, K key, long timestamp) {
        int i;
        long eventEnd = timestamp + this.sessionTimeout;
        for (i = 0; i < ((Windows)w).size && ((Windows)w).starts[i] <= eventEnd; ++i) {
            if (((Windows)w).ends[i] < timestamp) continue;
            if (((Windows)w).starts[i] <= timestamp && ((Windows)w).ends[i] >= eventEnd) {
                return (A)((Windows)w).accs[i];
            }
            if (i + 1 == ((Windows)w).size || ((Windows)w).starts[i + 1] > eventEnd) {
                ((Windows)w).starts[i] = Math.min(((Windows)w).starts[i], timestamp);
                if (((Windows)w).ends[i] < eventEnd) {
                    this.removeFromDeadlines(key, ((Windows)w).ends[i]);
                    ((Windows)w).ends[i] = eventEnd;
                    this.addToDeadlines(key, ((Windows)w).ends[i]);
                }
                return (A)((Windows)w).accs[i];
            }
            this.removeFromDeadlines(key, ((Windows)w).ends[i]);
            ((Windows)w).ends[i] = ((Windows)w).ends[i + 1];
            this.combineFn.accept(((Windows)w).accs[i], ((Windows)w).accs[i + 1]);
            ((Windows)w).removeWindow(i + 1);
            return (A)((Windows)w).accs[i];
        }
        this.addToDeadlines(key, eventEnd);
        return this.insertWindow(w, i, timestamp, eventEnd);
    }

    private A insertWindow(Windows<A> w, int idx, long windowStart, long windowEnd) {
        ((Windows)w).expandIfNeeded();
        ((Windows)w).copy(idx, idx + 1, ((Windows)w).size - idx);
        ((Windows)w).size++;
        ((Windows)w).starts[idx] = windowStart;
        ((Windows)w).ends[idx] = windowEnd;
        ((Windows)w).accs[idx] = this.aggrOp.createFn().get();
        return (A)((Windows)w).accs[idx];
    }

    static enum Keys {
        CURRENT_WATERMARK;

    }

    public static class Windows<A>
    implements IdentifiedDataSerializable {
        private int size;
        private long[] starts = new long[2];
        private long[] ends = new long[2];
        private A[] accs = new Object[2];

        private void removeWindow(int idx) {
            --this.size;
            this.copy(idx + 1, idx, this.size - idx);
        }

        private void removeHead(int count) {
            this.copy(count, 0, this.size - count);
            this.size -= count;
        }

        private void copy(int from, int to, int length) {
            System.arraycopy(this.starts, from, this.starts, to, length);
            System.arraycopy(this.ends, from, this.ends, to, length);
            System.arraycopy(this.accs, from, this.accs, to, length);
        }

        private void expandIfNeeded() {
            if (this.size == this.starts.length) {
                this.starts = Arrays.copyOf(this.starts, 2 * this.starts.length);
                this.ends = Arrays.copyOf(this.ends, 2 * this.ends.length);
                this.accs = Arrays.copyOf(this.accs, 2 * this.accs.length);
            }
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 12;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.size);
            for (int i = 0; i < this.size; ++i) {
                out.writeLong(this.starts[i]);
                out.writeLong(this.ends[i]);
                out.writeObject(this.accs[i]);
            }
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.size = in.readInt();
            if (this.size > this.starts.length) {
                int newSize = QuickMath.nextPowerOfTwo((int)this.size);
                this.starts = new long[newSize];
                this.ends = new long[newSize];
                this.accs = new Object[newSize];
            }
            for (int i = 0; i < this.size; ++i) {
                this.starts[i] = in.readLong();
                this.ends[i] = in.readLong();
                this.accs[i] = in.readObject();
            }
        }

        public String toString() {
            StringJoiner sj = new StringJoiner(", ", this.getClass().getSimpleName() + '{', "}");
            for (int i = 0; i < this.size; ++i) {
                sj.add("[s=" + com.hazelcast.jet.impl.util.Util.toLocalDateTime(this.starts[i]).toLocalTime() + ", e=" + com.hazelcast.jet.impl.util.Util.toLocalDateTime(this.ends[i]).toLocalTime() + ", a=" + this.accs[i] + ']');
            }
            return sj.toString();
        }
    }
}

