/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.AggregateOperation;
import com.hazelcast.jet.TimestampedEntry;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.WindowDefinition;
import com.hazelcast.jet.function.DistributedComparator;
import com.hazelcast.jet.function.DistributedToLongFunction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;

public class SlidingWindowP<T, A, R>
extends AbstractProcessor {
    final Map<Long, Map<Object, A>> tsToKeyToAcc = new HashMap<Long, Map<Object, A>>();
    final Map<Object, A> slidingWindow = new HashMap<Object, A>();
    private final WindowDefinition wDef;
    private final DistributedToLongFunction<? super T> getFrameTimestampF;
    private final Function<? super T, ?> getKeyF;
    private final AggregateOperation<? super T, A, R> aggrOp;
    private final AbstractProcessor.FlatMapper<Watermark, Object> flatMapper;
    private long nextFrameTsToEmit = Long.MIN_VALUE;
    private final A emptyAcc;
    private Traverser<Object> finalTraverser;

    public SlidingWindowP(Function<? super T, ?> getKeyF, DistributedToLongFunction<? super T> getFrameTimestampF, WindowDefinition winDef, AggregateOperation<? super T, A, R> aggrOp) {
        this.wDef = winDef;
        this.getFrameTimestampF = getFrameTimestampF;
        this.getKeyF = getKeyF;
        this.aggrOp = aggrOp;
        this.flatMapper = this.flatMapper(wm -> this.windowTraverserAndEvictor(wm.timestamp()).append(wm));
        this.emptyAcc = aggrOp.createAccumulatorF().get();
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        Object t = item;
        Long frameTimestamp = this.getFrameTimestampF.applyAsLong(t);
        assert (frameTimestamp.longValue() == this.wDef.floorFrameTs(frameTimestamp)) : "timestamp not on the verge of a frame";
        Object key = this.getKeyF.apply(t);
        Object acc = this.tsToKeyToAcc.computeIfAbsent(frameTimestamp, x -> new HashMap()).computeIfAbsent(key, k -> this.aggrOp.createAccumulatorF().get());
        this.aggrOp.accumulateItemF().accept(acc, t);
        return true;
    }

    @Override
    protected boolean tryProcessWm0(@Nonnull Watermark wm) {
        return this.flatMapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        if (this.finalTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            long topTs = (Long)this.tsToKeyToAcc.keySet().stream().max(DistributedComparator.naturalOrder()).get();
            this.finalTraverser = this.windowTraverserAndEvictor(topTs + this.wDef.frameLength());
        }
        return this.emitFromTraverser(this.finalTraverser);
    }

    private Traverser<Object> windowTraverserAndEvictor(long endTsExclusive) {
        if (this.nextFrameTsToEmit == Long.MIN_VALUE) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return Traversers.empty();
            }
            long bottomTs = (Long)this.tsToKeyToAcc.keySet().stream().min(DistributedComparator.naturalOrder()).orElseThrow(() -> new AssertionError((Object)"Failed to find the min key in a non-empty map"));
            this.nextFrameTsToEmit = Math.min(bottomTs, this.wDef.floorFrameTs(endTsExclusive));
        }
        long rangeStart = this.nextFrameTsToEmit;
        this.nextFrameTsToEmit = this.wDef.higherFrameTs(endTsExclusive);
        return Traversers.traverseStream(SlidingWindowP.range(rangeStart, this.nextFrameTsToEmit, this.wDef.frameLength()).boxed()).flatMap(frameTs -> Traversers.traverseIterable(this.computeWindow((long)frameTs).entrySet()).map(e -> new TimestampedEntry((long)frameTs, e.getKey(), this.aggrOp.finishAccumulationF().apply(e.getValue()))).onFirstNull(() -> this.completeWindow((long)frameTs)));
    }

    private Map<Object, A> computeWindow(long frameTs) {
        if (this.wDef.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(frameTs, Collections.emptyMap());
        }
        if (this.aggrOp.deductAccumulatorF() != null) {
            this.patchSlidingWindow(this.aggrOp.combineAccumulatorsF(), this.tsToKeyToAcc.get(frameTs));
            return this.slidingWindow;
        }
        HashMap window = new HashMap();
        for (long ts = frameTs - this.wDef.windowLength() + this.wDef.frameLength(); ts <= frameTs; ts += this.wDef.frameLength()) {
            this.tsToKeyToAcc.getOrDefault(ts, Collections.emptyMap()).forEach((key, currAcc) -> this.aggrOp.combineAccumulatorsF().accept(window.computeIfAbsent(key, k -> this.aggrOp.createAccumulatorF().get()), currAcc));
        }
        return window;
    }

    private void completeWindow(long frameTs) {
        Map<Object, A> evictedFrame = this.tsToKeyToAcc.remove(frameTs - this.wDef.windowLength() + this.wDef.frameLength());
        if (this.aggrOp.deductAccumulatorF() != null) {
            this.patchSlidingWindow(this.aggrOp.deductAccumulatorF(), evictedFrame);
        }
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> patchOp, Map<Object, A> patchingFrame) {
        if (patchingFrame == null) {
            return;
        }
        for (Map.Entry<Object, A> e : patchingFrame.entrySet()) {
            this.slidingWindow.compute(e.getKey(), (k, acc) -> {
                Object result = acc != null ? acc : this.aggrOp.createAccumulatorF().get();
                patchOp.accept((A)result, (A)e.getValue());
                return result.equals(this.emptyAcc) ? null : result;
            });
        }
    }

    private static LongStream range(long start, long end, long step) {
        return start >= end ? LongStream.empty() : LongStream.iterate(start, n -> n + step).limit(1L + (end - start - 1L) / step);
    }
}

