/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window;

import java.util.Collection;
import java.util.Objects;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.PanedWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.internal.GeneralWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.MergingWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.internal.PanedWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

public class WindowOperator<K, W extends Window>
extends AbstractStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow>,
Triggerable<K, W> {
    private static final long serialVersionUID = 1L;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final WindowAssigner<W> windowAssigner;
    private final Trigger<W> trigger;
    private final TypeSerializer<W> windowSerializer;
    private final LogicalType[] inputFieldTypes;
    private final LogicalType[] accumulatorTypes;
    private final LogicalType[] aggResultTypes;
    private final LogicalType[] windowPropertyTypes;
    private final boolean sendRetraction;
    private final int rowtimeIndex;
    private final long allowedLateness;
    private NamespaceAggsHandleFunction<W> windowAggregator;
    private GeneratedNamespaceAggsHandleFunction<W> generatedWindowAggregator;
    private RecordEqualiser equaliser;
    private GeneratedRecordEqualiser generatedEqualiser;
    private transient InternalWindowProcessFunction<K, W> windowFunction;
    private transient TimestampedCollector<BaseRow> collector;
    private transient boolean functionsClosed = false;
    private transient InternalTimerService<W> internalTimerService;
    private transient InternalValueState<K, W, BaseRow> windowState;
    private transient InternalValueState<K, W, BaseRow> previousState;
    private transient TriggerContext triggerContext;
    private transient JoinedRow reuseOutput;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;

    WindowOperator(NamespaceAggsHandleFunction<W> windowAggregator, RecordEqualiser equaliser, WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> windowSerializer, LogicalType[] inputFieldTypes, LogicalType[] accumulatorTypes, LogicalType[] aggResultTypes, LogicalType[] windowPropertyTypes, int rowtimeIndex, boolean sendRetraction, long allowedLateness) {
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        this.windowAggregator = (NamespaceAggsHandleFunction)Preconditions.checkNotNull(windowAggregator);
        this.equaliser = (RecordEqualiser)Preconditions.checkNotNull((Object)equaliser);
        this.windowAssigner = (WindowAssigner)Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger)Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer)Preconditions.checkNotNull(windowSerializer);
        this.inputFieldTypes = (LogicalType[])Preconditions.checkNotNull((Object)inputFieldTypes);
        this.accumulatorTypes = (LogicalType[])Preconditions.checkNotNull((Object)accumulatorTypes);
        this.aggResultTypes = (LogicalType[])Preconditions.checkNotNull((Object)aggResultTypes);
        this.windowPropertyTypes = (LogicalType[])Preconditions.checkNotNull((Object)windowPropertyTypes);
        this.allowedLateness = allowedLateness;
        this.sendRetraction = sendRetraction;
        Preconditions.checkArgument((!windowAssigner.isEventTime() || rowtimeIndex >= 0 ? 1 : 0) != 0);
        this.rowtimeIndex = rowtimeIndex;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    WindowOperator(GeneratedNamespaceAggsHandleFunction<W> generatedWindowAggregator, GeneratedRecordEqualiser generatedEqualiser, WindowAssigner<W> windowAssigner, Trigger<W> trigger, TypeSerializer<W> windowSerializer, LogicalType[] inputFieldTypes, LogicalType[] accumulatorTypes, LogicalType[] aggResultTypes, LogicalType[] windowPropertyTypes, int rowtimeIndex, boolean sendRetraction, long allowedLateness) {
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        this.generatedWindowAggregator = (GeneratedNamespaceAggsHandleFunction)Preconditions.checkNotNull(generatedWindowAggregator);
        this.generatedEqualiser = (GeneratedRecordEqualiser)Preconditions.checkNotNull((Object)generatedEqualiser);
        this.windowAssigner = (WindowAssigner)Preconditions.checkNotNull(windowAssigner);
        this.trigger = (Trigger)Preconditions.checkNotNull(trigger);
        this.windowSerializer = (TypeSerializer)Preconditions.checkNotNull(windowSerializer);
        this.inputFieldTypes = (LogicalType[])Preconditions.checkNotNull((Object)inputFieldTypes);
        this.accumulatorTypes = (LogicalType[])Preconditions.checkNotNull((Object)accumulatorTypes);
        this.aggResultTypes = (LogicalType[])Preconditions.checkNotNull((Object)aggResultTypes);
        this.windowPropertyTypes = (LogicalType[])Preconditions.checkNotNull((Object)windowPropertyTypes);
        this.allowedLateness = allowedLateness;
        this.sendRetraction = sendRetraction;
        Preconditions.checkArgument((!windowAssigner.isEventTime() || rowtimeIndex >= 0 ? 1 : 0) != 0);
        this.rowtimeIndex = rowtimeIndex;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.collector.eraseTimestamp();
        this.internalTimerService = this.getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContext();
        this.triggerContext.open();
        ValueStateDescriptor windowStateDescriptor = new ValueStateDescriptor("window-aggs", (TypeSerializer)new BaseRowSerializer(this.getExecutionConfig(), this.accumulatorTypes));
        this.windowState = (InternalValueState)this.getOrCreateKeyedState(this.windowSerializer, (StateDescriptor)windowStateDescriptor);
        if (this.sendRetraction) {
            LogicalType[] valueTypes = (LogicalType[])ArrayUtils.addAll((Object[])this.aggResultTypes, (Object[])this.windowPropertyTypes);
            ValueStateDescriptor previousStateDescriptor = new ValueStateDescriptor("previous-aggs", (TypeSerializer)new BaseRowSerializer(this.getExecutionConfig(), valueTypes));
            this.previousState = (InternalValueState)this.getOrCreateKeyedState(this.windowSerializer, (StateDescriptor)previousStateDescriptor);
        }
        if (this.generatedWindowAggregator != null) {
            this.windowAggregator = (NamespaceAggsHandleFunction)this.generatedWindowAggregator.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        }
        if (this.generatedEqualiser != null) {
            this.equaliser = (RecordEqualiser)this.generatedEqualiser.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        }
        WindowContext windowContext = new WindowContext();
        this.windowAggregator.open(new PerWindowStateDataViewStore(this.getKeyedStateBackend(), this.windowSerializer, (RuntimeContext)this.getRuntimeContext()));
        this.windowFunction = this.windowAssigner instanceof MergingWindowAssigner ? new MergingWindowProcessFunction((MergingWindowAssigner)this.windowAssigner, this.windowAggregator, this.windowSerializer, this.allowedLateness) : (this.windowAssigner instanceof PanedWindowAssigner ? new PanedWindowProcessFunction((PanedWindowAssigner)this.windowAssigner, this.windowAggregator, this.allowedLateness) : new GeneralWindowProcessFunction(this.windowAssigner, this.windowAggregator, this.allowedLateness));
        this.windowFunction.open(windowContext);
        this.reuseOutput = new JoinedRow();
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, (Meter)new MeterView(this.numLateRecordsDropped, 60));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long watermark = this.internalTimerService.currentWatermark();
            if (watermark < 0L) {
                return 0L;
            }
            return this.internalTimerService.currentProcessingTime() - watermark;
        });
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        this.triggerContext = null;
        this.functionsClosed = true;
        this.windowAggregator.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        this.collector = null;
        this.triggerContext = null;
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            this.windowAggregator.close();
        }
    }

    public void processElement(StreamRecord<BaseRow> record) throws Exception {
        BaseRow inputRow = (BaseRow)record.getValue();
        long timestamp = this.windowAssigner.isEventTime() ? inputRow.getLong(this.rowtimeIndex) : this.internalTimerService.currentProcessingTime();
        Collection<W> affectedWindows = this.windowFunction.assignStateNamespace(inputRow, timestamp);
        boolean isElementDropped = true;
        for (Window window : affectedWindows) {
            isElementDropped = false;
            this.windowState.setCurrentNamespace((Object)window);
            BaseRow acc = (BaseRow)this.windowState.value();
            if (acc == null) {
                acc = this.windowAggregator.createAccumulators();
            }
            this.windowAggregator.setAccumulators(window, acc);
            if (BaseRowUtil.isAccumulateMsg(inputRow)) {
                this.windowAggregator.accumulate(inputRow);
            } else {
                this.windowAggregator.retract(inputRow);
            }
            acc = this.windowAggregator.getAccumulators();
            this.windowState.update((Object)acc);
        }
        Collection<W> actualWindows = this.windowFunction.assignActualWindows(inputRow, timestamp);
        for (Window window : actualWindows) {
            isElementDropped = false;
            this.triggerContext.window = window;
            boolean triggerResult = this.triggerContext.onElement(inputRow, timestamp);
            if (triggerResult) {
                this.emitWindowResult(window);
            }
            this.registerCleanupTimer(window);
        }
        if (isElementDropped) {
            this.lateRecordsDroppedRate.markEvent();
        }
    }

    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        this.setCurrentKey(timer.getKey());
        this.triggerContext.window = (Window)timer.getNamespace();
        if (this.triggerContext.onEventTime(timer.getTimestamp())) {
            this.emitWindowResult(this.triggerContext.window);
        }
        if (this.windowAssigner.isEventTime()) {
            this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, timer.getTimestamp());
        }
    }

    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        this.setCurrentKey(timer.getKey());
        this.triggerContext.window = (Window)timer.getNamespace();
        if (this.triggerContext.onProcessingTime(timer.getTimestamp())) {
            this.emitWindowResult(this.triggerContext.window);
        }
        if (!this.windowAssigner.isEventTime()) {
            this.windowFunction.cleanWindowIfNeeded(this.triggerContext.window, timer.getTimestamp());
        }
    }

    private void emitWindowResult(W window) throws Exception {
        BaseRow aggResult = this.windowFunction.getWindowAggregationResult(window);
        if (this.sendRetraction) {
            this.previousState.setCurrentNamespace(window);
            BaseRow previousAggResult = (BaseRow)this.previousState.value();
            if (previousAggResult != null) {
                if (!this.equaliser.equalsWithoutHeader(aggResult, previousAggResult)) {
                    this.reuseOutput.replace((BaseRow)this.getCurrentKey(), previousAggResult);
                    BaseRowUtil.setRetract(this.reuseOutput);
                    this.collector.collect((Object)this.reuseOutput);
                    this.reuseOutput.replace((BaseRow)this.getCurrentKey(), aggResult);
                    BaseRowUtil.setAccumulate(this.reuseOutput);
                    this.collector.collect((Object)this.reuseOutput);
                    this.previousState.update((Object)aggResult);
                }
            } else {
                this.reuseOutput.replace((BaseRow)this.getCurrentKey(), aggResult);
                BaseRowUtil.setAccumulate(this.reuseOutput);
                this.collector.collect((Object)this.reuseOutput);
                this.previousState.update((Object)aggResult);
            }
        } else {
            this.reuseOutput.replace((BaseRow)this.getCurrentKey(), aggResult);
            this.collector.collect((Object)this.reuseOutput);
        }
    }

    private void registerCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W window) {
        if (this.windowAssigner.isEventTime()) {
            long cleanupTime = Math.max(0L, ((Window)window).maxTimestamp() + this.allowedLateness);
            return cleanupTime >= ((Window)window).maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        }
        return Math.max(0L, ((Window)window).maxTimestamp());
    }

    private K currentKey() {
        return (K)this.getCurrentKey();
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    private class TriggerContext
    implements Trigger.OnMergeContext {
        private W window;
        private Collection<W> mergedWindows;

        private TriggerContext() {
        }

        public void open() throws Exception {
            WindowOperator.this.trigger.open(this);
        }

        boolean onElement(BaseRow row, long timestamp) throws Exception {
            return WindowOperator.this.trigger.onElement(row, timestamp, this.window);
        }

        boolean onProcessingTime(long time) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(time, this.window);
        }

        boolean onEventTime(long time) throws Exception {
            return WindowOperator.this.trigger.onEventTime(time, this.window);
        }

        void onMerge() throws Exception {
            WindowOperator.this.trigger.onMerge(this.window, this);
        }

        @Override
        public long getCurrentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            WindowOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            WindowOperator.this.internalTimerService.registerEventTimeTimer(this.window, time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            WindowOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            WindowOperator.this.internalTimerService.deleteEventTimeTimer(this.window, time);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows != null && this.mergedWindows.size() > 0) {
                try {
                    State state = WindowOperator.this.getOrCreateKeyedState(WindowOperator.this.windowSerializer, stateDescriptor);
                    if (!(state instanceof InternalMergingState)) {
                        throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                    }
                    ((InternalMergingState)state).mergeNamespaces(this.window, this.mergedWindows);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error while merging state.", e);
                }
            }
        }
    }

    private class WindowContext
    implements InternalWindowProcessFunction.Context<K, W> {
        private WindowContext() {
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            Objects.requireNonNull(stateDescriptor, "The state properties must not be null");
            return (S)WindowOperator.this.getPartitionedState(stateDescriptor);
        }

        @Override
        public K currentKey() {
            return WindowOperator.this.currentKey();
        }

        @Override
        public long currentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override
        public BaseRow getWindowAccumulators(W window) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(window);
            return (BaseRow)WindowOperator.this.windowState.value();
        }

        @Override
        public void setWindowAccumulators(W window, BaseRow acc) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(window);
            WindowOperator.this.windowState.update((Object)acc);
        }

        @Override
        public void clearWindowState(W window) throws Exception {
            WindowOperator.this.windowState.setCurrentNamespace(window);
            WindowOperator.this.windowState.clear();
        }

        @Override
        public void clearPreviousState(W window) throws Exception {
            if (WindowOperator.this.previousState != null) {
                WindowOperator.this.previousState.setCurrentNamespace(window);
                WindowOperator.this.previousState.clear();
            }
        }

        @Override
        public void clearTrigger(W window) throws Exception {
            WindowOperator.this.triggerContext.window = window;
            WindowOperator.this.triggerContext.clear();
        }

        @Override
        public void deleteCleanupTimer(W window) throws Exception {
            long cleanupTime = WindowOperator.this.cleanupTime(window);
            if (cleanupTime == Long.MAX_VALUE) {
                return;
            }
            if (WindowOperator.this.windowAssigner.isEventTime()) {
                WindowOperator.this.triggerContext.deleteEventTimeTimer(cleanupTime);
            } else {
                WindowOperator.this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
            }
        }

        @Override
        public void onMerge(W newWindow, Collection<W> mergedWindows) throws Exception {
            WindowOperator.this.triggerContext.window = newWindow;
            WindowOperator.this.triggerContext.mergedWindows = mergedWindows;
            WindowOperator.this.triggerContext.onMerge();
        }
    }
}

