package org.apache.flink.table.runtime.operators.deduplicate.window.processors;

import java.time.ZoneId;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.class */
public final class RowTimeWindowDeduplicateProcessor implements SlicingWindowProcessor<Long> {
    private static final long serialVersionUID = 1;
    private final WindowBuffer.Factory bufferFactory;
    private final TypeSerializer<RowData> inputSerializer;
    private final int windowEndIndex;
    private final ZoneId shiftTimeZone;
    private transient long currentProgress;
    private transient WindowProcessor.Context<Long> ctx;
    private transient WindowTimerService<Long> windowTimerService;
    private transient WindowBuffer windowBuffer;
    private transient WindowValueState<Long> windowState;

    public RowTimeWindowDeduplicateProcessor(TypeSerializer<RowData> typeSerializer, WindowBuffer.Factory factory, int i, ZoneId zoneId) {
        this.inputSerializer = typeSerializer;
        this.bufferFactory = factory;
        this.windowEndIndex = i;
        this.shiftTimeZone = zoneId;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void open(WindowProcessor.Context<Long> context) throws Exception {
        this.ctx = context;
        InternalValueState internalValueState = (ValueState) this.ctx.getKeyedStateBackend().getOrCreateKeyedState(LongSerializer.INSTANCE, new ValueStateDescriptor("window_deduplicate", this.inputSerializer));
        this.windowTimerService = new SlicingWindowTimerServiceImpl(this.ctx.getTimerService(), this.shiftTimeZone);
        this.windowState = new WindowValueState<>(internalValueState);
        this.windowBuffer = this.bufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, true, this.shiftTimeZone);
        this.currentProgress = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void initializeWatermark(long j) {
        this.currentProgress = j;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public boolean processElement(RowData rowData, RowData rowData2) throws Exception {
        long j = rowData2.getLong(this.windowEndIndex);
        if (TimeWindowUtil.isWindowFired(j, this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(rowData, j, rowData2);
        return false;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void advanceProgress(long j) throws Exception {
        if (j > this.currentProgress) {
            this.currentProgress = j;
            this.windowBuffer.advanceProgress(this.currentProgress);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void clearWindow(long j, Long l) throws Exception {
        this.windowState.clear(l);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void close() throws Exception {
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void fireWindow(long j, Long l) throws Exception {
        RowData value = this.windowState.value(l);
        if (value != null) {
            this.ctx.output(value);
        }
    }
}
