/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.core;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WatermarkEmissionPolicy;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.ObjLongBiFunction;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class WatermarkSourceUtil<T> {
    private static final WatermarkPolicy[] EMPTY_WATERMARK_POLICIES = new WatermarkPolicy[0];
    private static final long[] EMPTY_LONGS = new long[0];
    private final long idleTimeoutNanos;
    private final ToLongFunction<? super T> timestampFn;
    private final Supplier<? extends WatermarkPolicy> newWmPolicyFn;
    private final ObjLongBiFunction<? super T, ?> wrapFn;
    private final WatermarkEmissionPolicy wmEmitPolicy;
    private final AppendableTraverser<Object> traverser = new AppendableTraverser(2);
    private WatermarkPolicy[] wmPolicies = EMPTY_WATERMARK_POLICIES;
    private long[] watermarks = EMPTY_LONGS;
    private long[] markIdleAt = EMPTY_LONGS;
    private long lastEmittedWm = Long.MIN_VALUE;
    private long topObservedWm = Long.MIN_VALUE;
    private boolean allAreIdle;

    public WatermarkSourceUtil(WatermarkGenerationParams<? super T> params) {
        this.idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(params.idleTimeoutMillis());
        this.timestampFn = params.timestampFn();
        this.wrapFn = params.wrapFn();
        this.newWmPolicyFn = params.newWmPolicyFn();
        this.wmEmitPolicy = params.wmEmitPolicy();
    }

    @Nonnull
    public Traverser<Object> handleEvent(T event, int partitionIndex) {
        return this.handleEvent(System.nanoTime(), event, partitionIndex);
    }

    @Nonnull
    public Traverser<Object> handleNoEvent() {
        return this.handleEvent(System.nanoTime(), null, -1);
    }

    Traverser<Object> handleEvent(long now, @Nullable T event, int partitionIndex) {
        assert (this.traverser.isEmpty()) : "the traverser returned previously not yet drained: remove all items from the traverser before you call this method again.";
        if (event != null) {
            long eventTime = this.timestampFn.applyAsLong(event);
            this.handleEventInt(now, partitionIndex, eventTime);
            this.traverser.append(this.wrapFn.apply(event, eventTime));
        } else {
            this.handleNoEventInt(now);
        }
        return this.traverser;
    }

    private void handleEventInt(long now, int partitionIndex, long eventTime) {
        this.wmPolicies[partitionIndex].reportEvent(eventTime);
        this.markIdleAt[partitionIndex] = now + this.idleTimeoutNanos;
        this.allAreIdle = false;
        this.handleNoEventInt(now);
    }

    private void handleNoEventInt(long now) {
        long min = Long.MAX_VALUE;
        for (int i = 0; i < this.watermarks.length; ++i) {
            if (this.idleTimeoutNanos > 0L && this.markIdleAt[i] <= now) continue;
            this.watermarks[i] = this.wmPolicies[i].getCurrentWatermark();
            this.topObservedWm = Math.max(this.topObservedWm, this.watermarks[i]);
            min = Math.min(min, this.watermarks[i]);
        }
        if (min == Long.MAX_VALUE) {
            if (this.allAreIdle) {
                return;
            }
            min = this.topObservedWm;
            this.allAreIdle = true;
        } else {
            this.allAreIdle = false;
        }
        long newWm = this.wmEmitPolicy.throttleWm(min, this.lastEmittedWm);
        if (newWm > this.lastEmittedWm) {
            this.traverser.append(new Watermark(newWm));
            this.lastEmittedWm = newWm;
        }
        if (this.allAreIdle) {
            this.traverser.append(WatermarkCoalescer.IDLE_MESSAGE);
        }
    }

    public void increasePartitionCount(int newPartitionCount) {
        this.increasePartitionCount(System.nanoTime(), newPartitionCount);
    }

    void increasePartitionCount(long now, int newPartitionCount) {
        int oldPartitionCount = this.wmPolicies.length;
        if (newPartitionCount < oldPartitionCount) {
            throw new IllegalArgumentException("partition count must increase. Old count=" + oldPartitionCount + ", new count=" + newPartitionCount);
        }
        this.wmPolicies = Arrays.copyOf(this.wmPolicies, newPartitionCount);
        this.watermarks = Arrays.copyOf(this.watermarks, newPartitionCount);
        this.markIdleAt = Arrays.copyOf(this.markIdleAt, newPartitionCount);
        for (int i = oldPartitionCount; i < newPartitionCount; ++i) {
            this.wmPolicies[i] = this.newWmPolicyFn.get();
            this.watermarks[i] = Long.MIN_VALUE;
            this.markIdleAt[i] = now + this.idleTimeoutNanos;
        }
    }

    public long getWatermark(int partitionIndex) {
        return this.watermarks[partitionIndex];
    }

    public void restoreWatermark(int partitionIndex, long wm) {
        this.watermarks[partitionIndex] = wm;
    }
}

