/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.core.PartitionAware;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.function.ComparatorEx;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.collection.Long2ObjectHashMap;
import com.hazelcast.util.function.LongFunction;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class SlidingWindowP<K, A, R, OUT>
extends AbstractProcessor {
    final Long2ObjectHashMap<Map<K, A>> tsToKeyToAcc = new Long2ObjectHashMap();
    Map<K, A> slidingWindow;
    Map<K, A> slidingWindowBackup;
    long nextWinToEmit = Long.MIN_VALUE;
    @Nonnull
    private final SlidingWindowPolicy winPolicy;
    @Nonnull
    private final List<ToLongFunction<Object>> frameTimestampFns;
    @Nonnull
    private final List<Function<Object, ? extends K>> keyFns;
    @Nonnull
    private final AggregateOperation<A, ? extends R> aggrOp;
    @Nonnull
    private final A emptyAcc;
    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn;
    @Nullable
    private final BiConsumer<? super A, ? super A> combineFn;
    private final boolean isLastStage;
    @Nonnull
    private final AbstractProcessor.FlatMapper<Watermark, ?> wmFlatMapper;
    private ProcessingGuarantee processingGuarantee;
    private final LongFunction<Map<K, A>> createMapPerTsFunction;
    private final Function<K, A> createAccFunction;
    @Probe
    private final AtomicLong lateEventsDropped = new AtomicLong();
    @Probe
    private final AtomicLong totalFrames = new AtomicLong();
    @Probe
    private final AtomicLong totalKeysInFrames = new AtomicLong();
    private final long earlyResultsPeriod;
    private long lastTimeEarlyResultsEmitted;
    private Traverser<? extends OUT> earlyWinTraverser;
    private Traverser<Object> flushTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private long topTs = Long.MIN_VALUE;
    private long minRestoredNextWinToEmit = Long.MAX_VALUE;
    private long minRestoredFrameTs = Long.MAX_VALUE;
    private boolean badFrameRestored;

    public SlidingWindowP(@Nonnull List<? extends Function<?, ? extends K>> keyFns, @Nonnull List<? extends ToLongFunction<?>> frameTimestampFns, @Nonnull SlidingWindowPolicy winPolicy, long earlyResultsPeriod, @Nonnull AggregateOperation<A, ? extends R> aggrOp, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn, boolean isLastStage) {
        Preconditions.checkTrue((keyFns.size() == aggrOp.arity() ? 1 : 0) != 0, (String)(keyFns.size() + " key functions provided for " + aggrOp.arity() + "-arity aggregate operation"));
        if (!winPolicy.isTumbling()) {
            Objects.requireNonNull(aggrOp.combineFn(), "AggregateOperation.combineFn is required for sliding windows");
        }
        Preconditions.checkNotNegative((long)earlyResultsPeriod, (String)"earlyResultsPeriod must be zero or positive");
        this.winPolicy = winPolicy;
        this.frameTimestampFns = frameTimestampFns;
        this.keyFns = keyFns;
        this.earlyResultsPeriod = earlyResultsPeriod;
        this.aggrOp = aggrOp;
        this.combineFn = aggrOp.combineFn();
        this.mapToOutputFn = mapToOutputFn;
        this.isLastStage = isLastStage;
        this.wmFlatMapper = this.flatMapper(wm -> this.windowTraverserAndEvictor(wm.timestamp()).append(wm).onFirstNull(() -> {
            this.nextWinToEmit = winPolicy.higherFrameTs(wm.timestamp());
        }));
        this.emptyAcc = aggrOp.createFn().get();
        this.createMapPerTsFunction = x -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalFrames);
            return new HashMap();
        };
        this.createAccFunction = k -> {
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalKeysInFrames);
            return aggrOp.createFn().get();
        };
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.processingGuarantee = context.processingGuarantee();
        this.lastTimeEarlyResultsEmitted = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
    }

    @Override
    public boolean tryProcess() {
        if (this.earlyResultsPeriod == 0L || this.topTs == Long.MIN_VALUE) {
            return true;
        }
        if (this.earlyWinTraverser != null) {
            return this.emitFromTraverser(this.earlyWinTraverser);
        }
        long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        if (now < this.lastTimeEarlyResultsEmitted + this.earlyResultsPeriod) {
            return true;
        }
        long rangeStart = this.startingWindowTs(Long.MAX_VALUE);
        if (rangeStart == Long.MIN_VALUE) {
            return true;
        }
        this.lastTimeEarlyResultsEmitted = now;
        this.slidingWindowBackup = this.slidingWindow;
        this.slidingWindow = null;
        Stream<Long> earlyWinRange = SlidingWindowP.range(rangeStart, this.topTs + this.winPolicy.windowSize() - this.winPolicy.frameSize(), this.winPolicy.frameSize()).boxed();
        this.earlyWinTraverser = Traversers.traverseStream(earlyWinRange).flatMap(winEnd -> Traversers.traverseIterable(this.computeWindow((long)winEnd).entrySet()).map(e -> this.mapToOutputFn.apply(winEnd - this.winPolicy.windowSize(), (long)winEnd, e.getKey(), this.aggrOp.exportFn().apply(e.getValue()), true)).onFirstNull(() -> this.completeEarlyWindow((long)winEnd))).onFirstNull(() -> {
            this.slidingWindow = this.slidingWindowBackup;
            this.slidingWindowBackup = null;
            this.earlyWinTraverser = null;
        });
        return this.emitFromTraverser(this.earlyWinTraverser);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        long frameTs = this.frameTimestampFns.get(ordinal).applyAsLong(item);
        assert (frameTs == this.winPolicy.floorFrameTs(frameTs)) : "getFrameTsFn returned an invalid frame timestamp";
        if (frameTs < this.nextWinToEmit) {
            com.hazelcast.jet.impl.util.Util.logLateEvent(this.getLogger(), this.nextWinToEmit, item);
            com.hazelcast.jet.impl.util.Util.lazyIncrement(this.lateEventsDropped);
            return true;
        }
        K key = this.keyFns.get(ordinal).apply(item);
        A acc = ((Map)this.tsToKeyToAcc.computeIfAbsent(frameTs, this.createMapPerTsFunction)).computeIfAbsent(key, this.createAccFunction);
        this.aggrOp.accumulateFn(ordinal).accept(acc, item);
        this.topTs = Math.max(this.topTs, frameTs);
        return true;
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark wm) {
        return this.wmFlatMapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.flushBuffers();
    }

    @Override
    public boolean saveToSnapshot() {
        if (!this.isLastStage || this.flushTraverser != null) {
            return this.flushBuffers();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.tsToKeyToAcc.entrySet()).flatMap(e -> Traversers.traverseIterable(((Map)e.getValue()).entrySet()).map(e2 -> Util.entry(new SnapshotKey((Long)e.getKey(), e2.getKey()), e2.getValue()))).append(Util.entry(BroadcastKey.broadcastKey(Keys.NEXT_WIN_TO_EMIT), this.nextWinToEmit)).onFirstNull(() -> {
                LoggingUtil.logFine(this.getLogger(), "Saved nextWinToEmit: %s", this.nextWinToEmit);
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            BroadcastKey bcastKey = (BroadcastKey)key;
            if (!Keys.NEXT_WIN_TO_EMIT.equals(bcastKey.key())) {
                throw new JetException("Unexpected broadcast key: " + bcastKey.key());
            }
            long newNextWinToEmit = (Long)value;
            assert (this.processingGuarantee != ProcessingGuarantee.EXACTLY_ONCE || this.minRestoredNextWinToEmit == Long.MAX_VALUE || this.minRestoredNextWinToEmit == newNextWinToEmit) : "different values for nextWinToEmit restored, before=" + this.minRestoredNextWinToEmit + ", new=" + newNextWinToEmit;
            this.minRestoredNextWinToEmit = Math.min(newNextWinToEmit, this.minRestoredNextWinToEmit);
            return;
        }
        SnapshotKey k = (SnapshotKey)key;
        long higherFrameTs = this.winPolicy.higherFrameTs(k.timestamp - 1L);
        if (higherFrameTs != k.timestamp && !this.badFrameRestored) {
            this.badFrameRestored = true;
            this.getLogger().warning("Frames in the state do not match the current frame size: they were likely saved for a different window slide step or a different offset. The window results will probably be incorrect until all restored frames are emitted.");
        }
        this.minRestoredFrameTs = Math.min(higherFrameTs, this.minRestoredFrameTs);
        ((Map)this.tsToKeyToAcc.computeIfAbsent(higherFrameTs, this.createMapPerTsFunction)).merge(k.key, value, (o, n) -> {
            if (!this.badFrameRestored) {
                throw new JetException("Duplicate key in snapshot: " + k);
            }
            if (this.combineFn == null) {
                throw new JetException("AggregateOperation.combineFn required for merging restored frames");
            }
            this.combineFn.accept(o, n);
            com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalKeysInFrames, -1L);
            return o;
        });
        com.hazelcast.jet.impl.util.Util.lazyIncrement(this.totalKeysInFrames);
        this.topTs = Math.max(this.topTs, higherFrameTs);
    }

    @Override
    public boolean finishSnapshotRestore() {
        if (this.isLastStage) {
            this.nextWinToEmit = this.minRestoredNextWinToEmit > Long.MIN_VALUE ? this.winPolicy.higherFrameTs(this.minRestoredNextWinToEmit - 1L) : this.minRestoredNextWinToEmit;
            LoggingUtil.logFine(this.getLogger(), "Restored nextWinToEmit from snapshot to: %s", this.nextWinToEmit);
            if (this.nextWinToEmit > Long.MIN_VALUE + this.winPolicy.windowSize()) {
                for (long ts = this.minRestoredFrameTs; ts <= this.nextWinToEmit - this.winPolicy.windowSize(); ts += this.winPolicy.frameSize()) {
                    Map removed = (Map)this.tsToKeyToAcc.remove(ts);
                    if (removed == null) continue;
                    com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalFrames, -1L);
                    com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalKeysInFrames, -removed.size());
                }
            }
        }
        return true;
    }

    private Traverser<Object> windowTraverserAndEvictor(long wm) {
        long rangeStart = this.startingWindowTs(wm);
        if (rangeStart == Long.MIN_VALUE) {
            return Traversers.empty();
        }
        return Traversers.traverseStream(SlidingWindowP.range(rangeStart, wm, this.winPolicy.frameSize()).boxed()).flatMap(winEnd -> Traversers.traverseIterable(this.computeWindow((long)winEnd).entrySet()).map(e -> this.mapToOutputFn.apply(winEnd - this.winPolicy.windowSize(), (long)winEnd, e.getKey(), this.aggrOp.finishFn().apply(e.getValue()), false)).onFirstNull(() -> this.completeWindow((long)winEnd)));
    }

    private long startingWindowTs(long wm) {
        if (this.nextWinToEmit != Long.MIN_VALUE) {
            return this.nextWinToEmit;
        }
        if (this.tsToKeyToAcc.isEmpty()) {
            return Long.MIN_VALUE;
        }
        long bottomTs = (Long)this.tsToKeyToAcc.keySet().stream().min(ComparatorEx.naturalOrder()).orElseThrow(() -> new AssertionError((Object)"Failed to find the min key in a non-empty map"));
        return Math.min(bottomTs, this.winPolicy.floorFrameTs(wm));
    }

    private Map<K, A> computeWindow(long frameTs) {
        if (this.winPolicy.isTumbling()) {
            return (Map)this.tsToKeyToAcc.getOrDefault((Object)frameTs, Collections.emptyMap());
        }
        if (this.aggrOp.deductFn() == null) {
            return this.recomputeWindow(frameTs);
        }
        if (this.slidingWindow == null) {
            this.slidingWindow = this.recomputeWindow(frameTs);
        } else {
            this.patchSlidingWindow(this.aggrOp.combineFn(), (Map)this.tsToKeyToAcc.get(frameTs));
        }
        return this.slidingWindow;
    }

    private Map<K, A> recomputeWindow(long frameTs) {
        HashMap<Object, Object> window = new HashMap<Object, Object>();
        for (long ts = frameTs - this.winPolicy.windowSize() + this.winPolicy.frameSize(); ts <= frameTs; ts += this.winPolicy.frameSize()) {
            assert (this.combineFn != null) : "combineFn == null";
            for (Map.Entry entry : ((Map)this.tsToKeyToAcc.getOrDefault((Object)ts, Collections.emptyMap())).entrySet()) {
                this.combineFn.accept(window.computeIfAbsent(entry.getKey(), k -> this.aggrOp.createFn().get()), entry.getValue());
            }
        }
        return window;
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> patchOp, Map<K, A> patchingFrame) {
        if (patchingFrame == null) {
            return;
        }
        for (Map.Entry<K, A> e : patchingFrame.entrySet()) {
            this.slidingWindow.compute(e.getKey(), (k, acc) -> {
                Object result = acc != null ? acc : this.aggrOp.createFn().get();
                patchOp.accept((A)result, (A)e.getValue());
                return result.equals(this.emptyAcc) ? null : result;
            });
        }
    }

    private void completeWindow(long frameTs) {
        long tsOfFrameToEvict = frameTs - this.winPolicy.windowSize() + this.winPolicy.frameSize();
        Map evictedFrame = (Map)this.tsToKeyToAcc.remove(tsOfFrameToEvict);
        if (evictedFrame != null) {
            com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalKeysInFrames, -evictedFrame.size());
            com.hazelcast.jet.impl.util.Util.lazyAdd(this.totalFrames, -1L);
            if (!this.winPolicy.isTumbling() && this.aggrOp.deductFn() != null) {
                this.patchSlidingWindow(this.aggrOp.deductFn(), evictedFrame);
            }
        }
        assert ((long)this.tsToKeyToAcc.values().stream().mapToInt(Map::size).sum() == this.totalKeysInFrames.get()) : "totalKeysInFrames mismatch, expected=" + this.tsToKeyToAcc.values().stream().mapToInt(Map::size).sum() + ", actual=" + this.totalKeysInFrames.get();
    }

    private void completeEarlyWindow(long frameTs) {
        if (this.winPolicy.isTumbling() || this.aggrOp.deductFn() == null) {
            return;
        }
        Map frameToDeduct = (Map)this.tsToKeyToAcc.get(frameTs - this.winPolicy.windowSize() + this.winPolicy.frameSize());
        if (frameToDeduct != null) {
            this.patchSlidingWindow(this.aggrOp.deductFn(), frameToDeduct);
        }
    }

    private boolean flushBuffers() {
        if (this.flushTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.flushTraverser = this.windowTraverserAndEvictor(this.topTs + this.winPolicy.windowSize() - this.winPolicy.frameSize()).onFirstNull(() -> {
                this.flushTraverser = null;
            });
        }
        return this.emitFromTraverser(this.flushTraverser);
    }

    private static LongStream range(long start, long end, long step) {
        return start > end ? LongStream.empty() : LongStream.iterate(start, n -> n + step).limit(1L + (end - start) / step);
    }

    public static final class SnapshotKey
    implements PartitionAware<Object>,
    IdentifiedDataSerializable {
        long timestamp;
        Object key;

        public SnapshotKey() {
        }

        SnapshotKey(long timestamp, @Nonnull Object key) {
            this.timestamp = timestamp;
            this.key = key;
        }

        public Object getPartitionKey() {
            return this.key;
        }

        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 15;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeLong(this.timestamp);
            out.writeObject(this.key);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.timestamp = in.readLong();
            this.key = in.readObject();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public boolean equals(Object o) {
            if (this == o) return true;
            if (!(o instanceof SnapshotKey)) return false;
            SnapshotKey that = (SnapshotKey)o;
            if (this.timestamp != that.timestamp) return false;
            if (!Objects.equals(this.key, that.key)) return false;
            return true;
        }

        public int hashCode() {
            int hc = (int)(this.timestamp ^ this.timestamp >>> 32);
            hc = 73 * hc + Objects.hashCode(this.key);
            return hc;
        }

        public String toString() {
            return "SnapshotKey{timestamp=" + this.timestamp + ", key=" + this.key + '}';
        }
    }

    static enum Keys {
        NEXT_WIN_TO_EMIT;

    }
}

