/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.util;

import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.function.DistributedLongSupplier;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.TimestampHistory;
import com.hazelcast.util.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

public final class WatermarkPolicyUtil {
    private WatermarkPolicyUtil() {
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> limitingTimestampAndWallClockLag(final long timestampLag, final long wallClockLag, final DistributedLongSupplier wallClock) {
        Preconditions.checkNotNegative(timestampLag, "timestampLag must not be negative");
        Preconditions.checkNotNegative(wallClockLag, "wallClockLag must not be negative");
        return () -> new WatermarkPolicyBase(){

            @Override
            public long reportEvent(long timestamp) {
                this.updateFromWallClock();
                return this.makeWmAtLeast(timestamp - timestampLag);
            }

            @Override
            public long getCurrentWatermark() {
                return this.updateFromWallClock();
            }

            private long updateFromWallClock() {
                return this.makeWmAtLeast(wallClock.getAsLong() - wallClockLag);
            }
        };
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> limitingLagAndLull(final long lag, final long maxLullMs, final DistributedLongSupplier nanoClock) {
        Preconditions.checkNotNegative(lag, "lag must not be negative");
        Preconditions.checkNotNegative(maxLullMs, "maxLullMs must not be negative");
        return () -> new WatermarkPolicyBase(){
            private long maxLullAt = Long.MIN_VALUE;

            @Override
            public long reportEvent(long timestamp) {
                this.maxLullAt = this.monotonicTimeMillis() + maxLullMs;
                return this.makeWmAtLeast(timestamp - lag);
            }

            @Override
            public long getCurrentWatermark() {
                long now = this.monotonicTimeMillis();
                this.ensureInitialized(now);
                long millisPastMaxLull = Math.max(0L, now - this.maxLullAt);
                this.maxLullAt += millisPastMaxLull;
                long currentWm = super.getCurrentWatermark();
                return currentWm > Long.MIN_VALUE ? this.advanceWmBy(millisPastMaxLull) : currentWm;
            }

            private void ensureInitialized(long now) {
                if (this.maxLullAt == Long.MIN_VALUE) {
                    this.maxLullAt = now + maxLullMs;
                }
            }

            private long monotonicTimeMillis() {
                return TimeUnit.NANOSECONDS.toMillis(nanoClock.getAsLong());
            }
        };
    }

    @Nonnull
    public static WatermarkPolicy limitingLagAndDelay(final long maxLag, final long maxRetainNanos, final int numStoredSamples, final DistributedLongSupplier nanoClock) {
        return new WatermarkPolicyBase(){
            private long topTs = Long.MIN_VALUE;
            private final TimestampHistory history = new TimestampHistory(maxRetainNanos, numStoredSamples);

            @Override
            public long reportEvent(long timestamp) {
                this.topTs = Math.max(timestamp, this.topTs);
                return this.applyMaxRetain(timestamp - maxLag);
            }

            @Override
            public long getCurrentWatermark() {
                return this.applyMaxRetain(super.getCurrentWatermark());
            }

            private long applyMaxRetain(long wm) {
                return this.makeWmAtLeast(Math.max(wm, this.history.sample(nanoClock.getAsLong(), this.topTs)));
            }
        };
    }

    public static abstract class WatermarkPolicyBase
    implements WatermarkPolicy {
        private long wm = Long.MIN_VALUE;

        protected long makeWmAtLeast(long proposedWm) {
            this.wm = Math.max(this.wm, proposedWm);
            return this.wm;
        }

        protected long advanceWmBy(long amount) {
            this.wm += amount;
            return this.wm;
        }

        @Override
        public long getCurrentWatermark() {
            return this.wm;
        }
    }
}

