/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.eventtime;

import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.RelativeClock;

@Public
public class WatermarksWithIdleness<T>
implements WatermarkGenerator<T> {
    private final WatermarkGenerator<T> watermarks;
    private final IdlenessTimer idlenessTimer;
    private boolean isIdleNow = false;

    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, RelativeClock clock) {
        Preconditions.checkNotNull(idleTimeout, "idleTimeout");
        Preconditions.checkArgument(!idleTimeout.isZero() && !idleTimeout.isNegative(), "idleTimeout must be greater than zero");
        this.watermarks = Preconditions.checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        this.watermarks.onEvent(event, eventTimestamp, output);
        this.idlenessTimer.activity();
        this.isIdleNow = false;
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (this.idlenessTimer.checkIfIdle()) {
            if (!this.isIdleNow) {
                output.markIdle();
                this.isIdleNow = true;
            }
        } else {
            this.watermarks.onPeriodicEmit(output);
        }
    }

    @Internal
    @VisibleForTesting
    public static final class IdlenessTimer {
        private final RelativeClock clock;
        private long counter;
        private long lastCounter;
        private long startOfInactivityNanos;
        private final long maxIdleTimeNanos;

        public IdlenessTimer(RelativeClock clock, Duration idleTimeout) {
            long idleNanos;
            this.clock = clock;
            try {
                idleNanos = idleTimeout.toNanos();
            }
            catch (ArithmeticException ignored) {
                idleNanos = Long.MAX_VALUE;
            }
            this.maxIdleTimeNanos = idleNanos;
        }

        public void activity() {
            ++this.counter;
        }

        public boolean checkIfIdle() {
            if (this.counter != this.lastCounter) {
                this.lastCounter = this.counter;
                this.startOfInactivityNanos = 0L;
                return false;
            }
            if (this.startOfInactivityNanos == 0L) {
                this.startOfInactivityNanos = this.clock.relativeTimeNanos();
                return false;
            }
            return this.clock.relativeTimeNanos() - this.startOfInactivityNanos > this.maxIdleTimeNanos;
        }
    }
}

