/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.watermark.extension.eventtime;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.watermark.BoolWatermark;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;

public class EventTimeWatermarkHandler {
    private final int numOfInput;
    private final Output<?> output;
    private final List<EventTimeWithIdleStatus> eventTimePerInput;
    @Nullable
    private final InternalTimeServiceManager<?> timeServiceManager;
    private long lastEmitWatermark = Long.MIN_VALUE;
    private boolean lastEmitIdleStatus = false;
    private final BitSet hasReceiveWatermarks;

    public EventTimeWatermarkHandler(int numOfInput, Output<?> output, @Nullable InternalTimeServiceManager<?> timeServiceManager) {
        Preconditions.checkArgument(numOfInput >= 1 && numOfInput <= 2, "numOfInput should between 1 and 2");
        this.numOfInput = numOfInput;
        this.output = output;
        this.eventTimePerInput = new ArrayList<EventTimeWithIdleStatus>(numOfInput);
        for (int i = 0; i < numOfInput; ++i) {
            this.eventTimePerInput.add(new EventTimeWithIdleStatus());
        }
        this.timeServiceManager = timeServiceManager;
        this.hasReceiveWatermarks = new BitSet(numOfInput);
    }

    private EventTimeUpdateStatus processEventTime(long timestamp, int inputIndex) throws Exception {
        Preconditions.checkState(inputIndex < this.numOfInput);
        this.hasReceiveWatermarks.set(inputIndex);
        this.eventTimePerInput.get(inputIndex).setEventTime(timestamp);
        this.eventTimePerInput.get(inputIndex).setIdleStatus(false);
        return this.tryAdvanceEventTimeAndEmitWatermark();
    }

    private EventTimeUpdateStatus tryAdvanceEventTimeAndEmitWatermark() throws Exception {
        long currentEventTime = this.getCurrentEventTime();
        if (currentEventTime > this.lastEmitWatermark && this.hasReceiveWatermarks.cardinality() == this.numOfInput) {
            this.output.emitWatermark(new WatermarkEvent(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(currentEventTime), false));
            this.lastEmitWatermark = currentEventTime;
            if (this.timeServiceManager != null) {
                this.timeServiceManager.advanceWatermark(new Watermark(currentEventTime));
            }
            return EventTimeUpdateStatus.ofUpdatedWatermark(this.lastEmitWatermark);
        }
        return EventTimeUpdateStatus.NO_UPDATE;
    }

    private void processEventTimeIdleStatus(boolean isIdle, int inputIndex) {
        Preconditions.checkState(inputIndex < this.numOfInput);
        this.hasReceiveWatermarks.set(inputIndex);
        this.eventTimePerInput.get(inputIndex).setIdleStatus(isIdle);
        this.tryEmitEventTimeIdleStatus();
    }

    private void tryEmitEventTimeIdleStatus() {
        boolean inputIdle = this.isAllInputIdle();
        if (inputIdle != this.lastEmitIdleStatus) {
            this.output.emitWatermark(new WatermarkEvent(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(inputIdle), false));
            this.lastEmitIdleStatus = inputIdle;
        }
    }

    private long getCurrentEventTime() {
        long currentEventTime = Long.MAX_VALUE;
        for (EventTimeWithIdleStatus eventTimeWithIdleStatus : this.eventTimePerInput) {
            if (eventTimeWithIdleStatus.isIdle()) continue;
            currentEventTime = Math.min(currentEventTime, eventTimeWithIdleStatus.getEventTime());
        }
        return currentEventTime;
    }

    private boolean isAllInputIdle() {
        boolean allInputIsIdle = true;
        for (EventTimeWithIdleStatus eventTimeWithIdleStatus : this.eventTimePerInput) {
            allInputIsIdle &= eventTimeWithIdleStatus.isIdle();
        }
        return allInputIsIdle;
    }

    public long getLastEmitWatermark() {
        return this.lastEmitWatermark;
    }

    public EventTimeUpdateStatus processWatermark(org.apache.flink.api.common.watermark.Watermark watermark, int inputIndex) throws Exception {
        if (EventTimeExtension.isEventTimeWatermark((String)watermark.getIdentifier())) {
            long timestamp = ((LongWatermark)watermark).getValue();
            return this.processEventTime(timestamp, inputIndex);
        }
        if (EventTimeExtension.isIdleStatusWatermark((String)watermark.getIdentifier())) {
            boolean isIdle = ((BoolWatermark)watermark).getValue();
            this.processEventTimeIdleStatus(isIdle, inputIndex);
        }
        return EventTimeUpdateStatus.NO_UPDATE;
    }

    static class EventTimeWithIdleStatus {
        private long eventTime = Long.MIN_VALUE;
        private boolean isIdle = false;

        EventTimeWithIdleStatus() {
        }

        public long getEventTime() {
            return this.eventTime;
        }

        public void setEventTime(long eventTime) {
            this.eventTime = Math.max(this.eventTime, eventTime);
        }

        public boolean isIdle() {
            return this.isIdle;
        }

        public void setIdleStatus(boolean idle) {
            this.isIdle = idle;
        }
    }

    public static class EventTimeUpdateStatus {
        public static final EventTimeUpdateStatus NO_UPDATE = new EventTimeUpdateStatus(false, -1L);
        private final boolean isEventTimeUpdated;
        private final long newEventTime;

        private EventTimeUpdateStatus(boolean isEventTimeUpdated, long newEventTime) {
            this.isEventTimeUpdated = isEventTimeUpdated;
            this.newEventTime = newEventTime;
        }

        public boolean isEventTimeUpdated() {
            return this.isEventTimeUpdated;
        }

        public long getNewEventTime() {
            return this.newEventTime;
        }

        public static EventTimeUpdateStatus ofUpdatedWatermark(long newEventTime) {
            return new EventTimeUpdateStatus(true, newEventTime);
        }
    }
}

