/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet;

import com.hazelcast.jet.WatermarkPolicy;
import com.hazelcast.jet.function.DistributedLongSupplier;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.TimestampHistory;
import com.hazelcast.util.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

public final class WatermarkPolicies {
    private static final int DEFAULT_NUM_STORED_SAMPLES = 16;

    private WatermarkPolicies() {
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> withFixedLag(final long lag) {
        Preconditions.checkNotNegative(lag, "lag must not be negative");
        return () -> new WatermarkPolicyBase(){

            @Override
            public long reportEvent(long timestamp) {
                return this.makeWmAtLeast(timestamp - lag);
            }
        };
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> limitingLagAndDelay(long lag, long maxDelayMs) {
        return () -> WatermarkPolicies.limitingLagAndDelay(lag, TimeUnit.MILLISECONDS.toNanos(maxDelayMs), 16, System::nanoTime);
    }

    @Nonnull
    static WatermarkPolicy limitingLagAndDelay(final long maxLag, final long maxRetainNanos, final int numStoredSamples, final DistributedLongSupplier nanoClock) {
        return new WatermarkPolicyBase(){
            private long topTs = Long.MIN_VALUE;
            private final TimestampHistory history = new TimestampHistory(maxRetainNanos, numStoredSamples);

            @Override
            public long reportEvent(long timestamp) {
                this.topTs = Math.max(timestamp, this.topTs);
                return this.applyMaxRetain(timestamp - maxLag);
            }

            @Override
            public long getCurrentWatermark() {
                return this.applyMaxRetain(super.getCurrentWatermark());
            }

            private long applyMaxRetain(long wm) {
                return this.makeWmAtLeast(Math.max(wm, this.history.sample(nanoClock.getAsLong(), this.topTs)));
            }
        };
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> limitingTimestampAndWallClockLag(long timestampLag, long wallClockLag) {
        return WatermarkPolicies.limitingTimestampAndWallClockLag(timestampLag, wallClockLag, System::currentTimeMillis);
    }

    @Nonnull
    static DistributedSupplier<WatermarkPolicy> limitingTimestampAndWallClockLag(final long timestampLag, final long wallClockLag, final DistributedLongSupplier wallClock) {
        Preconditions.checkNotNegative(timestampLag, "timestampLag must not be negative");
        Preconditions.checkNotNegative(wallClockLag, "wallClockLag must not be negative");
        return () -> new WatermarkPolicyBase(){

            @Override
            public long reportEvent(long timestamp) {
                this.updateFromWallClock();
                return this.makeWmAtLeast(timestamp - timestampLag);
            }

            @Override
            public long getCurrentWatermark() {
                return this.updateFromWallClock();
            }

            private long updateFromWallClock() {
                return this.makeWmAtLeast(wallClock.getAsLong() - wallClockLag);
            }
        };
    }

    @Nonnull
    public static DistributedSupplier<WatermarkPolicy> limitingLagAndLull(long lag, long maxLullMs) {
        return WatermarkPolicies.limitingLagAndLull(lag, maxLullMs, System::nanoTime);
    }

    @Nonnull
    static DistributedSupplier<WatermarkPolicy> limitingLagAndLull(final long lag, final long maxLullMs, final DistributedLongSupplier nanoClock) {
        Preconditions.checkNotNegative(lag, "lag must not be negative");
        Preconditions.checkNotNegative(maxLullMs, "maxLullMs must not be negative");
        return () -> new WatermarkPolicyBase(){
            private long maxLullAt = Long.MIN_VALUE;

            @Override
            public long reportEvent(long timestamp) {
                this.maxLullAt = this.monotonicTimeMillis() + maxLullMs;
                return this.makeWmAtLeast(timestamp - lag);
            }

            @Override
            public long getCurrentWatermark() {
                long now = this.monotonicTimeMillis();
                this.ensureInitialized(now);
                long millisPastMaxLull = Math.max(0L, now - this.maxLullAt);
                this.maxLullAt += millisPastMaxLull;
                return this.advanceWmBy(millisPastMaxLull);
            }

            private void ensureInitialized(long now) {
                if (this.maxLullAt == Long.MIN_VALUE) {
                    this.maxLullAt = now + maxLullMs;
                }
            }

            private long monotonicTimeMillis() {
                return TimeUnit.NANOSECONDS.toMillis(nanoClock.getAsLong());
            }
        };
    }

    private static abstract class WatermarkPolicyBase
    implements WatermarkPolicy {
        private long wm = Long.MIN_VALUE;

        private WatermarkPolicyBase() {
        }

        long makeWmAtLeast(long proposedWm) {
            this.wm = Math.max(this.wm, proposedWm);
            return this.wm;
        }

        long advanceWmBy(long amount) {
            this.wm += amount;
            return this.wm;
        }

        @Override
        public long getCurrentWatermark() {
            return this.wm;
        }
    }
}

