/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.util.Collector;

public class RowTimeDeduplicateKeepFirstRowFunction
extends KeyedProcessFunction<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private final TypeInformation<RowData> typeInfo;
    private final long stateRetentionTime;
    private final int rowtimeIndex;
    protected ValueState<RowData> waitingToEmitOnTimerState;
    protected ValueState<Boolean> alreadyEmittedState;
    private transient Counter numLateRecordsDropped;

    public RowTimeDeduplicateKeepFirstRowFunction(InternalTypeInfo<RowData> typeInfo, long minRetentionTime, int rowtimeIndex) {
        this.typeInfo = typeInfo;
        this.stateRetentionTime = minRetentionTime;
        this.rowtimeIndex = rowtimeIndex;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        ValueStateDescriptor timerStateDesc = new ValueStateDescriptor("waiting-to-emit-on-timer", this.typeInfo);
        this.waitingToEmitOnTimerState = this.getRuntimeContext().getState(timerStateDesc);
        ValueStateDescriptor stateDesc = new ValueStateDescriptor("already-emitted-state-boolean", Types.BOOLEAN);
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(this.stateRetentionTime);
        if (ttlConfig.isEnabled()) {
            stateDesc.enableTimeToLive(ttlConfig);
        }
        this.alreadyEmittedState = this.getRuntimeContext().getState(stateDesc);
        this.numLateRecordsDropped = this.getRuntimeContext().getMetricGroup().counter("numLateRecordsDropped");
    }

    public void processElement(RowData input, KeyedProcessFunction.Context ctx, Collector<RowData> out) throws Exception {
        DeduplicateFunctionHelper.checkInsertOnly(input);
        long rowtime = input.getLong(this.rowtimeIndex);
        if (rowtime < ctx.timerService().currentWatermark()) {
            this.numLateRecordsDropped.inc();
            return;
        }
        Boolean allreadyEmitted = (Boolean)this.alreadyEmittedState.value();
        if (allreadyEmitted != null && allreadyEmitted.booleanValue()) {
            return;
        }
        RowData preRow = (RowData)this.waitingToEmitOnTimerState.value();
        if (DeduplicateFunctionHelper.shouldKeepCurrentRow(preRow, input, this.rowtimeIndex, false)) {
            if (preRow != null) {
                ctx.timerService().deleteEventTimeTimer(preRow.getLong(this.rowtimeIndex));
            }
            ctx.timerService().registerEventTimeTimer(rowtime);
            this.waitingToEmitOnTimerState.update((Object)input);
        }
    }

    public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<RowData> out) throws Exception {
        out.collect((Object)((RowData)this.waitingToEmitOnTimerState.value()));
        this.waitingToEmitOnTimerState.clear();
        this.alreadyEmittedState.update((Object)true);
    }

    @VisibleForTesting
    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }
}

