/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.processor.SnapshotKey;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.util.Preconditions;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class SlidingWindowP<K, A, R, OUT>
extends AbstractProcessor {
    final Map<Long, Map<K, A>> tsToKeyToAcc = new HashMap<Long, Map<K, A>>();
    Map<K, A> slidingWindow;
    long nextWinToEmit = Long.MIN_VALUE;
    @Nonnull
    private final SlidingWindowPolicy winPolicy;
    @Nonnull
    private final List<ToLongFunction<Object>> frameTimestampFns;
    @Nonnull
    private final List<Function<Object, ? extends K>> keyFns;
    @Nonnull
    private final AggregateOperation<A, R> aggrOp;
    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn;
    @Nullable
    private final BiConsumer<? super A, ? super A> combineFn;
    private final boolean isLastStage;
    @Nonnull
    private final AbstractProcessor.FlatMapper<Watermark, ?> wmFlatMapper;
    @Probe
    private AtomicLong lateEventsDropped = new AtomicLong();
    @Probe
    private AtomicLong totalFrames = new AtomicLong();
    @Probe
    private AtomicLong totalKeysInFrames = new AtomicLong();
    @Nonnull
    private final A emptyAcc;
    private Traverser<Object> flushTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private long topTs = Long.MIN_VALUE;
    private long minRestoredNextWinToEmit = Long.MAX_VALUE;
    private ProcessingGuarantee processingGuarantee;

    public SlidingWindowP(@Nonnull List<? extends Function<?, ? extends K>> keyFns, @Nonnull List<? extends ToLongFunction<?>> frameTimestampFns, @Nonnull SlidingWindowPolicy winPolicy, @Nonnull AggregateOperation<A, R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, OUT> mapToOutputFn, boolean isLastStage) {
        Preconditions.checkTrue(keyFns.size() == aggrOp.arity(), keyFns.size() + " key functions provided for " + aggrOp.arity() + "-arity aggregate operation");
        if (!winPolicy.isTumbling()) {
            Objects.requireNonNull(aggrOp.combineFn(), "AggregateOperation.combineFn is required for sliding windows");
        }
        this.winPolicy = winPolicy;
        this.frameTimestampFns = frameTimestampFns;
        this.keyFns = keyFns;
        this.aggrOp = aggrOp;
        this.combineFn = aggrOp.combineFn();
        this.mapToOutputFn = mapToOutputFn;
        this.isLastStage = isLastStage;
        this.wmFlatMapper = this.flatMapper(wm -> this.windowTraverserAndEvictor(wm.timestamp()).onFirstNull(() -> {
            this.nextWinToEmit = winPolicy.higherFrameTs(wm.timestamp());
        }));
        this.emptyAcc = aggrOp.createFn().get();
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.processingGuarantee = context.processingGuarantee();
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        long frameTs = this.frameTimestampFns.get(ordinal).applyAsLong(item);
        assert (frameTs == this.winPolicy.floorFrameTs(frameTs)) : "getFrameTsFn returned an invalid frame timestamp";
        if (frameTs < this.nextWinToEmit) {
            com.hazelcast.jet.impl.util.Util.logLateEvent(this.getLogger(), this.nextWinToEmit, item);
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.lateEventsDropped);
            return true;
        }
        K key = this.keyFns.get(ordinal).apply(item);
        Object acc = this.tsToKeyToAcc.computeIfAbsent(frameTs, x -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalFrames);
            return new HashMap();
        }).computeIfAbsent(key, k -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalKeysInFrames);
            return this.aggrOp.createFn().get();
        });
        this.aggrOp.accumulateFn(ordinal).accept(acc, item);
        this.topTs = Math.max(this.topTs, frameTs);
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark wm) {
        return this.wmFlatMapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.flushBuffers();
    }

    @Override
    public boolean saveToSnapshot() {
        if (!this.isLastStage || this.flushTraverser != null) {
            return this.flushBuffers();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.tsToKeyToAcc.entrySet()).flatMap(e -> Traversers.traverseIterable(((Map)e.getValue()).entrySet()).map(e2 -> Util.entry(new SnapshotKey((Long)e.getKey(), e2.getKey()), e2.getValue()))).append(Util.entry(BroadcastKey.broadcastKey(Keys.NEXT_WIN_TO_EMIT), this.nextWinToEmit)).onFirstNull(() -> {
                LoggingUtil.logFine(this.getLogger(), "Saved nextWinToEmit: %s", this.nextWinToEmit);
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            BroadcastKey bcastKey = (BroadcastKey)key;
            if (!Keys.NEXT_WIN_TO_EMIT.equals(bcastKey.key())) {
                throw new JetException("Unexpected broadcast key: " + bcastKey.key());
            }
            long newNextWinToEmit = (Long)value;
            assert (this.processingGuarantee != ProcessingGuarantee.EXACTLY_ONCE || this.minRestoredNextWinToEmit == Long.MAX_VALUE || this.minRestoredNextWinToEmit == newNextWinToEmit) : "different values for nextWinToEmit restored, before=" + this.minRestoredNextWinToEmit + ", new=" + newNextWinToEmit;
            this.minRestoredNextWinToEmit = Math.min(newNextWinToEmit, this.minRestoredNextWinToEmit);
            return;
        }
        SnapshotKey k = (SnapshotKey)key;
        if (this.tsToKeyToAcc.computeIfAbsent(k.timestamp, x -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalFrames);
            return new HashMap();
        }).put(k.key, value) != null) {
            throw new JetException("Duplicate key in snapshot: " + k);
        }
        com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalKeysInFrames);
        this.topTs = Math.max(this.topTs, k.timestamp);
    }

    @Override
    public boolean finishSnapshotRestore() {
        if (this.isLastStage) {
            this.nextWinToEmit = this.minRestoredNextWinToEmit;
            LoggingUtil.logFine(this.getLogger(), "Restored nextWinToEmit from snapshot to: %s", this.nextWinToEmit);
        }
        return true;
    }

    private Traverser<Object> windowTraverserAndEvictor(long wm) {
        long rangeStart;
        if (this.nextWinToEmit != Long.MIN_VALUE) {
            rangeStart = this.nextWinToEmit;
        } else {
            if (this.tsToKeyToAcc.isEmpty()) {
                return Traversers.empty();
            }
            long bottomTs = (Long)this.tsToKeyToAcc.keySet().stream().min(DistributedComparator.naturalOrder()).orElseThrow(() -> new AssertionError((Object)"Failed to find the min key in a non-empty map"));
            rangeStart = Math.min(bottomTs, this.winPolicy.floorFrameTs(wm));
        }
        return Traversers.traverseStream(SlidingWindowP.range(rangeStart, wm, this.winPolicy.frameSize()).boxed()).flatMap(winEnd -> Traversers.traverseIterable(this.computeWindow((long)winEnd).entrySet()).map(e -> this.mapToOutputFn.apply(winEnd - this.winPolicy.windowSize(), (long)winEnd, e.getKey(), this.aggrOp.finishFn().apply(e.getValue()))).onFirstNull(() -> this.completeWindow((long)winEnd)));
    }

    private Map<K, A> computeWindow(long frameTs) {
        if (this.winPolicy.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(frameTs, Collections.emptyMap());
        }
        if (this.aggrOp.deductFn() == null) {
            return this.recomputeWindow(frameTs);
        }
        if (this.slidingWindow == null) {
            this.slidingWindow = this.recomputeWindow(frameTs);
        } else {
            this.patchSlidingWindow(this.aggrOp.combineFn(), this.tsToKeyToAcc.get(frameTs));
        }
        return this.slidingWindow;
    }

    private Map<K, A> recomputeWindow(long frameTs) {
        HashMap window = new HashMap();
        for (long ts = frameTs - this.winPolicy.windowSize() + this.winPolicy.frameSize(); ts <= frameTs; ts += this.winPolicy.frameSize()) {
            this.tsToKeyToAcc.getOrDefault(ts, Collections.emptyMap()).forEach((key, currAcc) -> this.combineFn.accept(window.computeIfAbsent(key, k -> this.aggrOp.createFn().get()), currAcc));
        }
        return window;
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> patchOp, Map<K, A> patchingFrame) {
        if (patchingFrame == null) {
            return;
        }
        for (Map.Entry<K, A> e : patchingFrame.entrySet()) {
            this.slidingWindow.compute(e.getKey(), (k, acc) -> {
                Object result = acc != null ? acc : this.aggrOp.createFn().get();
                patchOp.accept((A)result, (A)e.getValue());
                return result.equals(this.emptyAcc) ? null : result;
            });
        }
    }

    private void completeWindow(long frameTs) {
        long frameToEvict = frameTs - this.winPolicy.windowSize() + this.winPolicy.frameSize();
        Map<K, A> evictedFrame = this.tsToKeyToAcc.remove(frameToEvict);
        if (evictedFrame != null) {
            com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalKeysInFrames, -evictedFrame.size());
            com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalFrames, -1L);
            if (!this.winPolicy.isTumbling() && this.aggrOp.deductFn() != null) {
                this.patchSlidingWindow(this.aggrOp.deductFn(), evictedFrame);
            }
        }
        assert ((long)this.tsToKeyToAcc.values().stream().mapToInt(Map::size).sum() == this.totalKeysInFrames.get()) : "totalKeysInFrames mismatch, expected=" + this.tsToKeyToAcc.values().stream().mapToInt(Map::size).sum() + ", actual=" + this.totalKeysInFrames.get();
    }

    private boolean flushBuffers() {
        if (this.flushTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.flushTraverser = this.windowTraverserAndEvictor(this.topTs + this.winPolicy.windowSize() - this.winPolicy.frameSize()).onFirstNull(() -> {
                this.flushTraverser = null;
            });
        }
        return this.emitFromTraverser(this.flushTraverser);
    }

    private static LongStream range(long start, long end, long step) {
        return start > end ? LongStream.empty() : LongStream.iterate(start, n -> n + step).limit(1L + (end - start) / step);
    }

    static enum Keys {
        NEXT_WIN_TO_EMIT;

    }
}

