/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ResettableSingletonTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.TimestampedItem;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.impl.memory.AccumulationLimitExceededException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class TransformStatefulP<T, K, S, R>
extends AbstractProcessor {
    private static final int HASH_MAP_INITIAL_CAPACITY = 16;
    private static final float HASH_MAP_LOAD_FACTOR = 0.75f;
    private static final Watermark FLUSHING_WATERMARK = new Watermark(Long.MAX_VALUE);
    @Probe(name="lateEventsDropped")
    private final Counter lateEventsDropped = SwCounter.newSwCounter();
    private final long ttl;
    private final Function<? super T, ? extends K> keyFn;
    private final ToLongFunction<? super T> timestampFn;
    private final Function<K, TimestampedItem<S>> createIfAbsentFn;
    private final TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn;
    @Nullable
    private final TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn;
    private final Map<K, TimestampedItem<S>> keyToState = new LinkedHashMap<K, TimestampedItem<S>>(16, 0.75f, true);
    private final AbstractProcessor.FlatMapper<T, R> flatMapper = this.flatMapper(this::flatMapEvent);
    private final AbstractProcessor.FlatMapper<Watermark, Object> wmFlatMapper = this.flatMapper(this::flatMapWm);
    private final EvictingTraverser evictingTraverser = new EvictingTraverser();
    private final Traverser<?> evictingTraverserFlattened = this.evictingTraverser.flatMap(x -> x);
    private long currentWm = Long.MIN_VALUE;
    private Traverser<? extends Map.Entry<?, ?>> snapshotTraverser;
    private boolean inComplete;
    private long maxEntries;

    public TransformStatefulP(long ttl, @Nonnull Function<? super T, ? extends K> keyFn, @Nonnull ToLongFunction<? super T> timestampFn, @Nonnull Supplier<? extends S> createFn, @Nonnull TriFunction<? super S, ? super K, ? super T, ? extends Traverser<R>> statefulFlatMapFn, @Nullable TriFunction<? super S, ? super K, ? super Long, ? extends Traverser<R>> onEvictFn) {
        this.ttl = ttl > 0L ? ttl : Long.MAX_VALUE;
        this.keyFn = keyFn;
        this.timestampFn = timestampFn;
        this.createIfAbsentFn = k -> new TimestampedItem(Long.MIN_VALUE, createFn.get());
        this.statefulFlatMapFn = statefulFlatMapFn;
        this.onEvictFn = onEvictFn;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        this.maxEntries = context.maxProcessorAccumulatedRecords();
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        return this.flatMapper.tryProcess(item);
    }

    @Nonnull
    private Traverser<R> flatMapEvent(T event) {
        long timestamp = this.timestampFn.applyAsLong(event);
        if (timestamp < this.currentWm && this.ttl != Long.MAX_VALUE) {
            com.hazelcast.jet.impl.util.Util.logLateEvent(this.getLogger(), (byte)0, this.currentWm, event);
            this.lateEventsDropped.inc();
            return Traversers.empty();
        }
        K key = this.keyFn.apply(event);
        TimestampedItem tsAndState = this.keyToState.computeIfAbsent(key, k -> {
            if ((long)this.keyToState.size() == this.maxEntries) {
                throw new AccumulationLimitExceededException();
            }
            return this.createIfAbsentFn.apply(k);
        });
        tsAndState.setTimestamp(Math.max(tsAndState.timestamp(), timestamp));
        Object state = tsAndState.item();
        return this.statefulFlatMapFn.apply(state, key, event);
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        this.keyedWatermarkCheck(watermark);
        return this.wmFlatMapper.tryProcess(watermark);
    }

    private Traverser<?> flatMapWm(Watermark wm) {
        this.currentWm = wm.timestamp();
        this.evictingTraverser.reset(wm);
        return this.evictingTraverserFlattened;
    }

    @Override
    public boolean complete() {
        this.inComplete = true;
        return this.tryProcessWatermark(FLUSHING_WATERMARK);
    }

    @Override
    public boolean saveToSnapshot() {
        if (this.inComplete) {
            return this.complete();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToState.entrySet()).append(Util.entry(BroadcastKey.broadcastKey(SnapshotKeys.WATERMARK), this.currentWm)).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        if (key instanceof BroadcastKey) {
            assert (((BroadcastKey)key).key() == SnapshotKeys.WATERMARK) : "Unexpected " + key;
            long wm = (Long)value;
            this.currentWm = this.currentWm == Long.MIN_VALUE ? wm : Math.min(this.currentWm, wm);
        } else {
            TimestampedItem old = this.keyToState.put(key, (TimestampedItem)value);
            assert (old == null) : "Duplicate key '" + key + '\'';
        }
    }

    private static enum SnapshotKeys {
        WATERMARK;

    }

    private class EvictingTraverser
    implements Traverser<Traverser<?>> {
        private Iterator<Map.Entry<K, TimestampedItem<S>>> keyToStateIterator;
        private final ResettableSingletonTraverser<Watermark> wmTraverser = new ResettableSingletonTraverser();

        private EvictingTraverser() {
        }

        void reset(Watermark wm) {
            this.keyToStateIterator = TransformStatefulP.this.keyToState.entrySet().iterator();
            if (wm == FLUSHING_WATERMARK) {
                return;
            }
            this.wmTraverser.accept(wm);
        }

        @Override
        public Traverser<?> next() {
            Map.Entry entry;
            long lastTouched;
            if (this.keyToStateIterator == null) {
                return null;
            }
            while (this.keyToStateIterator.hasNext() && (lastTouched = (entry = this.keyToStateIterator.next()).getValue().timestamp()) < com.hazelcast.jet.impl.util.Util.subtractClamped(TransformStatefulP.this.currentWm, TransformStatefulP.this.ttl)) {
                this.keyToStateIterator.remove();
                if (TransformStatefulP.this.onEvictFn == null) continue;
                return (Traverser)TransformStatefulP.this.onEvictFn.apply(entry.getValue().item(), entry.getKey(), TransformStatefulP.this.currentWm);
            }
            this.keyToStateIterator = null;
            return this.wmTraverser;
        }
    }
}

