package org.apache.flink.table.runtime.operators.window.tvf.operator;

import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.class */
public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase implements Triggerable<RowData, TimeWindow> {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final Trigger<TimeWindow> trigger;
    private final TypeSerializer<RowData> inputSerializer;
    private final TypeSerializer<TimeWindow> windowSerializer;
    private transient InternalTimerService<TimeWindow> internalTimerService;
    private transient ValueState<Long> counterState;
    private transient InternalMapState<RowData, TimeWindow, Long, RowData> windowState;
    private transient TriggerContextImpl triggerContext;
    private transient MergingWindowProcessFunction<RowData, TimeWindow> windowFunction;
    private transient NamespaceAggsHandleFunctionBase<TimeWindow> windowAggregator;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator$DummyWindowAggregator.class */
    private static class DummyWindowAggregator implements NamespaceAggsHandleFunctionBase<TimeWindow> {
        private final IllegalStateException thrown = new IllegalStateException("The function should not be called in DummyWindowAggregator");

        private DummyWindowAggregator() {
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void open(StateDataViewStore stateDataViewStore) throws Exception {
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void setAccumulators(TimeWindow timeWindow, RowData rowData) throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void accumulate(RowData rowData) throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void retract(RowData rowData) throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void merge(TimeWindow timeWindow, RowData rowData) throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public RowData createAccumulators() throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public RowData getAccumulators() throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void cleanup(TimeWindow timeWindow) throws Exception {
            throw this.thrown;
        }

        @Override // org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase
        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator$MergingConsumer.class */
    private static class MergingConsumer implements BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> {
        private final InternalMapState<RowData, TimeWindow, Long, RowData> windowState;

        public MergingConsumer(InternalMapState<RowData, TimeWindow, Long, RowData> internalMapState) {
            this.windowState = internalMapState;
        }

        public void accept(TimeWindow timeWindow, Collection<TimeWindow> collection) throws Throwable {
            Iterator<TimeWindow> it = collection.iterator();
            while (it.hasNext()) {
                this.windowState.setCurrentNamespace(it.next());
                Iterator it2 = this.windowState.iterator();
                this.windowState.setCurrentNamespace(timeWindow);
                while (it2.hasNext()) {
                    Map.Entry entry = (Map.Entry) it2.next();
                    this.windowState.put((Long) entry.getKey(), (RowData) entry.getValue());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator$TriggerContextImpl.class */
    public class TriggerContextImpl implements Trigger.OnMergeContext {
        private TimeWindow window;
        private Collection<TimeWindow> mergedWindows;

        private TriggerContextImpl() {
        }

        public void open() throws Exception {
            UnalignedWindowTableFunctionOperator.this.trigger.open(this);
        }

        public boolean onElement(RowData rowData, long j) throws Exception {
            return UnalignedWindowTableFunctionOperator.this.trigger.onElement(rowData, j, this.window);
        }

        public boolean onProcessingTime(long j) throws Exception {
            return UnalignedWindowTableFunctionOperator.this.trigger.onProcessingTime(j, this.window);
        }

        public boolean onEventTime(long j) throws Exception {
            return UnalignedWindowTableFunctionOperator.this.trigger.onEventTime(j, this.window);
        }

        public void onMerge() throws Exception {
            UnalignedWindowTableFunctionOperator.this.trigger.onMerge(this.window, this);
        }

        public void setWindow(TimeWindow timeWindow) {
            this.window = timeWindow;
        }

        public void setMergedWindows(Collection<TimeWindow> collection) {
            this.mergedWindows = collection;
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return UnalignedWindowTableFunctionOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return UnalignedWindowTableFunctionOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            UnalignedWindowTableFunctionOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            UnalignedWindowTableFunctionOperator.this.internalTimerService.registerEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            UnalignedWindowTableFunctionOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            UnalignedWindowTableFunctionOperator.this.internalTimerService.deleteEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public ZoneId getShiftTimeZone() {
            return UnalignedWindowTableFunctionOperator.this.shiftTimeZone;
        }

        public void clear() throws Exception {
            UnalignedWindowTableFunctionOperator.this.trigger.clear(this.window);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.isEmpty()) {
                return;
            }
            try {
                InternalMergingState orCreateKeyedState = UnalignedWindowTableFunctionOperator.this.getOrCreateKeyedState(UnalignedWindowTableFunctionOperator.this.windowSerializer, stateDescriptor);
                if (!(orCreateKeyedState instanceof InternalMergingState)) {
                    throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                }
                orCreateKeyedState.mergeNamespaces(this.window, this.mergedWindows);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return UnalignedWindowTableFunctionOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) UnalignedWindowTableFunctionOperator.this.getPartitionedState(this.window, UnalignedWindowTableFunctionOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator$WindowContextImpl.class */
    private class WindowContextImpl implements MergingWindowProcessFunction.MergingContext<RowData, TimeWindow> {
        private WindowContextImpl() {
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void deleteCleanupTimer(TimeWindow timeWindow) throws Exception {
            long cleanupTime = UnalignedWindowTableFunctionOperator.this.getCleanupTime(timeWindow);
            if (cleanupTime == Long.MAX_VALUE) {
                return;
            }
            if (UnalignedWindowTableFunctionOperator.this.windowAssigner.isEventTime()) {
                UnalignedWindowTableFunctionOperator.this.triggerContext.deleteEventTimeTimer(cleanupTime);
            } else {
                UnalignedWindowTableFunctionOperator.this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
            }
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            Objects.requireNonNull(stateDescriptor, "The state properties must not be null");
            return (S) UnalignedWindowTableFunctionOperator.this.getPartitionedState(stateDescriptor);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public RowData currentKey() {
            return (RowData) UnalignedWindowTableFunctionOperator.this.getCurrentKey();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public long currentProcessingTime() {
            return UnalignedWindowTableFunctionOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public long currentWatermark() {
            return UnalignedWindowTableFunctionOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public ZoneId getShiftTimeZone() {
            return UnalignedWindowTableFunctionOperator.this.shiftTimeZone;
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearWindowState(TimeWindow timeWindow) throws Exception {
            UnalignedWindowTableFunctionOperator.this.windowState.setCurrentNamespace(timeWindow);
            UnalignedWindowTableFunctionOperator.this.windowState.clear();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearTrigger(TimeWindow timeWindow) throws Exception {
            UnalignedWindowTableFunctionOperator.this.triggerContext.setWindow(timeWindow);
            UnalignedWindowTableFunctionOperator.this.triggerContext.clear();
        }

        public void onMerge(TimeWindow timeWindow, Collection<TimeWindow> collection) throws Exception {
            UnalignedWindowTableFunctionOperator.this.triggerContext.setWindow(timeWindow);
            UnalignedWindowTableFunctionOperator.this.triggerContext.setMergedWindows(collection);
            UnalignedWindowTableFunctionOperator.this.triggerContext.onMerge();
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void clearPreviousState(TimeWindow timeWindow) throws Exception {
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public RowData getWindowAccumulators(TimeWindow timeWindow) throws Exception {
            return null;
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public void setWindowAccumulators(TimeWindow timeWindow, RowData rowData) throws Exception {
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction.MergingContext
        public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, Throwable> getWindowStateMergingConsumer() {
            return new MergingConsumer(UnalignedWindowTableFunctionOperator.this.windowState);
        }

        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction.Context
        public /* bridge */ /* synthetic */ void onMerge(Window window, Collection collection) throws Exception {
            onMerge((TimeWindow) window, (Collection<TimeWindow>) collection);
        }
    }

    public UnalignedWindowTableFunctionOperator(GroupWindowAssigner<TimeWindow> groupWindowAssigner, TypeSerializer<TimeWindow> typeSerializer, TypeSerializer<RowData> typeSerializer2, int i, ZoneId zoneId) {
        super(groupWindowAssigner, i, zoneId);
        this.trigger = createTrigger(groupWindowAssigner);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.inputSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase, org.apache.flink.table.runtime.operators.TableStreamOperator
    public void open() throws Exception {
        super.open();
        this.internalTimerService = getInternalTimerService("session-window-tvf-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContextImpl();
        this.triggerContext.open();
        this.counterState = getRuntimeContext().getState(new ValueStateDescriptor("session-window-tvf-counter", LongSerializer.INSTANCE));
        this.windowState = getOrCreateKeyedState(this.windowSerializer, new MapStateDescriptor("session-window-tvf-acc", LongSerializer.INSTANCE, this.inputSerializer));
        this.windowAggregator = new DummyWindowAggregator();
        this.windowAggregator.open(new PerWindowStateDataViewStore(getKeyedStateBackend(), this.windowSerializer, getRuntimeContext()));
        WindowContextImpl windowContextImpl = new WindowContextImpl();
        this.windowFunction = new MergingWindowProcessFunction<>((MergingWindowAssigner) this.windowAssigner, this.windowAggregator, this.windowSerializer, 0L);
        this.windowFunction.open(windowContextImpl);
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new MeterView(this.numLateRecordsDropped));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.internalTimerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(this.internalTimerService.currentProcessingTime() - currentWatermark);
        });
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase
    public void close() throws Exception {
        super.close();
        if (this.windowAggregator != null) {
            this.windowAggregator.close();
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        long currentProcessingTime;
        RowData rowData = (RowData) streamRecord.getValue();
        if (!this.windowAssigner.isEventTime()) {
            currentProcessingTime = getProcessingTimeService().getCurrentProcessingTime();
        } else {
            if (rowData.isNullAt(this.rowtimeIndex)) {
                this.numNullRowTimeRecordsDropped.inc();
                return;
            }
            currentProcessingTime = rowData.getTimestamp(this.rowtimeIndex, 3).getMillisecond();
        }
        Long l = (Long) this.counterState.value();
        if (null == l) {
            l = 0L;
        }
        this.counterState.update(Long.valueOf(l.longValue() + serialVersionUID));
        long utcTimestampMills = TimeWindowUtil.toUtcTimestampMills(currentProcessingTime, this.shiftTimeZone);
        Collection<TimeWindow> assignStateNamespace = this.windowFunction.assignStateNamespace(rowData, utcTimestampMills);
        boolean z = true;
        Iterator<TimeWindow> it = assignStateNamespace.iterator();
        while (it.hasNext()) {
            z = false;
            this.windowState.setCurrentNamespace(it.next());
            this.windowState.put(l, rowData);
        }
        Collection<TimeWindow> assignActualWindows = this.windowFunction.assignActualWindows(rowData, utcTimestampMills);
        Preconditions.checkArgument((assignStateNamespace.isEmpty() && assignActualWindows.isEmpty()) || !(assignStateNamespace.isEmpty() || assignActualWindows.isEmpty()));
        for (TimeWindow timeWindow : assignActualWindows) {
            this.triggerContext.setWindow(timeWindow);
            if (this.triggerContext.onElement(rowData, utcTimestampMills)) {
                emitWindowResult(timeWindow);
            }
            registerCleanupTimer(timeWindow);
        }
        if (z) {
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    private void registerCleanupTimer(TimeWindow timeWindow) {
        long cleanupTime = getCleanupTime(timeWindow);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    private void emitWindowResult(TimeWindow timeWindow) throws Exception {
        this.windowState.setCurrentNamespace(this.windowFunction.getStateWindow(timeWindow));
        Iterator it = this.windowState.iterator();
        TreeMap treeMap = new TreeMap();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            treeMap.put((Long) entry.getKey(), (RowData) entry.getValue());
        }
        Iterator it2 = treeMap.entrySet().iterator();
        while (it2.hasNext()) {
            collect((RowData) ((Map.Entry) it2.next()).getValue(), Collections.singletonList(timeWindow));
        }
    }

    public void onEventTime(InternalTimer<RowData, TimeWindow> internalTimer) throws Exception {
        this.triggerContext.setWindow((TimeWindow) internalTimer.getNamespace());
        if (this.triggerContext.onEventTime(internalTimer.getTimestamp())) {
            emitWindowResult(this.triggerContext.window);
        }
        if (this.windowAssigner.isEventTime()) {
            this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, internalTimer.getTimestamp());
        }
    }

    public void onProcessingTime(InternalTimer<RowData, TimeWindow> internalTimer) throws Exception {
        this.triggerContext.setWindow((TimeWindow) internalTimer.getNamespace());
        if (this.triggerContext.onProcessingTime(internalTimer.getTimestamp())) {
            emitWindowResult(this.triggerContext.window);
        }
        if (this.windowAssigner.isEventTime()) {
            return;
        }
        this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, internalTimer.getTimestamp());
    }

    private long getCleanupTime(TimeWindow timeWindow) {
        long max = Math.max(0L, timeWindow.maxTimestamp());
        return TimeWindowUtil.toEpochMillsForTimer(max >= timeWindow.maxTimestamp() ? max : Long.MAX_VALUE, this.shiftTimeZone);
    }

    private static Trigger<TimeWindow> createTrigger(GroupWindowAssigner<TimeWindow> groupWindowAssigner) {
        return groupWindowAssigner.isEventTime() ? EventTimeTriggers.afterEndOfWindow() : ProcessingTimeTriggers.afterEndOfWindow();
    }

    @VisibleForTesting
    public Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    @VisibleForTesting
    public Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }
}
