/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.AggregateOperation;
import com.hazelcast.jet.Session;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public class SessionWindowP<T, K, A, R>
extends AbstractProcessor {
    private static final Watermark COMPLETING_WM = new Watermark(Long.MAX_VALUE);
    final Map<K, Windows> keyToWindows = new HashMap<K, Windows>();
    final SortedMap<Long, Set<K>> deadlineToKeys = new TreeMap<Long, Set<K>>();
    private final long sessionTimeout;
    private final DistributedToLongFunction<? super T> getTimestampF;
    private final DistributedFunction<? super T, K> getKeyF;
    private final DistributedSupplier<A> newAccumulatorF;
    private final BiConsumer<? super A, ? super T> accumulateF;
    private final DistributedFunction<? super A, R> finishAccumulationF;
    private final DistributedBiConsumer<? super A, ? super A> combineAccF;
    private final AbstractProcessor.FlatMapper<Watermark, Session<K, R>> expiredSessionFlatmapper;

    public SessionWindowP(long sessionTimeout, DistributedToLongFunction<? super T> getTimestampF, DistributedFunction<? super T, K> getKeyF, AggregateOperation<? super T, A, R> aggrOp) {
        this.getTimestampF = getTimestampF;
        this.getKeyF = getKeyF;
        this.newAccumulatorF = aggrOp.createAccumulatorF();
        this.accumulateF = aggrOp.accumulateItemF();
        this.combineAccF = aggrOp.combineAccumulatorsF();
        this.finishAccumulationF = aggrOp.finishAccumulationF();
        this.sessionTimeout = sessionTimeout;
        this.expiredSessionFlatmapper = this.flatMapper(this::expiredSessionTraverser);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        Object event = item;
        long timestamp = this.getTimestampF.applyAsLong(event);
        K key = this.getKeyF.apply(event);
        this.keyToWindows.computeIfAbsent(key, k -> new Windows()).addEvent(key, timestamp, event);
        return true;
    }

    @Override
    protected boolean tryProcessWm0(@Nonnull Watermark wm) {
        return this.expiredSessionFlatmapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.expiredSessionFlatmapper.tryProcess(COMPLETING_WM);
    }

    private Traverser<Session<K, R>> expiredSessionTraverser(Watermark wm) {
        List distinctKeys = this.deadlineToKeys.headMap(wm.timestamp()).values().stream().flatMap(Collection::stream).distinct().collect(Collectors.toList());
        this.deadlineToKeys.headMap(wm.timestamp()).clear();
        Stream sessions = distinctKeys.stream().map(key -> this.keyToWindows.get(key).closeWindows(key, wm.timestamp())).flatMap(Collection::stream);
        return Traversers.traverseStream(sessions);
    }

    private void addToDeadlines(K key, long deadline) {
        this.deadlineToKeys.computeIfAbsent(deadline, x -> new HashSet()).add(key);
    }

    private void removeFromDeadlines(K key, long deadline) {
        Set ks = (Set)this.deadlineToKeys.get(deadline);
        ks.remove(key);
        if (ks.isEmpty()) {
            this.deadlineToKeys.remove(deadline);
        }
    }

    private class Windows {
        private int size;
        private long[] starts = new long[2];
        private long[] ends = new long[2];
        private A[] accs = new Object[2];

        private Windows() {
        }

        void addEvent(K key, long timestamp, T event) {
            SessionWindowP.this.accumulateF.accept(this.resolveAcc(key, timestamp), event);
        }

        List<Session<K, R>> closeWindows(K key, long wm) {
            int i;
            ArrayList sessions = new ArrayList();
            for (i = 0; i < this.size && this.ends[i] < wm; ++i) {
                sessions.add(new Session(key, this.starts[i], this.ends[i], SessionWindowP.this.finishAccumulationF.apply(this.accs[i])));
            }
            if (i != this.size) {
                this.removeHead(i);
            } else {
                SessionWindowP.this.keyToWindows.remove(key);
            }
            return sessions;
        }

        private A resolveAcc(K key, long timestamp) {
            int i;
            long eventEnd = timestamp + SessionWindowP.this.sessionTimeout;
            for (i = 0; i < this.size && this.starts[i] <= eventEnd; ++i) {
                if (this.ends[i] < timestamp) continue;
                if (this.starts[i] <= timestamp && this.ends[i] >= eventEnd) {
                    return this.accs[i];
                }
                if (i + 1 == this.size || this.starts[i + 1] > eventEnd) {
                    this.starts[i] = Math.min(this.starts[i], timestamp);
                    if (this.ends[i] < eventEnd) {
                        SessionWindowP.this.removeFromDeadlines(key, this.ends[i]);
                        this.ends[i] = eventEnd;
                        SessionWindowP.this.addToDeadlines(key, this.ends[i]);
                    }
                    return this.accs[i];
                }
                SessionWindowP.this.removeFromDeadlines(key, this.ends[i]);
                this.ends[i] = this.ends[i + 1];
                SessionWindowP.this.combineAccF.accept(this.accs[i], this.accs[i + 1]);
                this.removeWindow(i + 1);
                return this.accs[i];
            }
            SessionWindowP.this.addToDeadlines(key, eventEnd);
            return this.insertWindow(i, timestamp, eventEnd);
        }

        private A insertWindow(int idx, long windowStart, long windowEnd) {
            this.expandIfNeeded();
            this.copy(idx, idx + 1, this.size - idx);
            ++this.size;
            this.starts[idx] = windowStart;
            this.ends[idx] = windowEnd;
            this.accs[idx] = SessionWindowP.this.newAccumulatorF.get();
            return this.accs[idx];
        }

        private void removeWindow(int idx) {
            --this.size;
            this.copy(idx + 1, idx, this.size - idx);
        }

        private void removeHead(int count) {
            this.copy(count, 0, this.size - count);
            this.size -= count;
        }

        private void copy(int from, int to, int length) {
            System.arraycopy(this.starts, from, this.starts, to, length);
            System.arraycopy(this.ends, from, this.ends, to, length);
            System.arraycopy(this.accs, from, this.accs, to, length);
        }

        private void expandIfNeeded() {
            if (this.size == this.starts.length) {
                this.starts = Arrays.copyOf(this.starts, 2 * this.starts.length);
                this.ends = Arrays.copyOf(this.ends, 2 * this.ends.length);
                this.accs = Arrays.copyOf(this.accs, 2 * this.accs.length);
            }
        }
    }
}

