package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import java.time.ZoneId;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.util.TimeWindowUtil;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceWindowAggProcessor.class */
public abstract class AbstractSliceWindowAggProcessor extends AbstractWindowAggProcessor<Long> implements SlicingWindowProcessor<Long> {
    protected final WindowBuffer.Factory windowBufferFactory;
    protected final SliceAssigner sliceAssigner;
    protected final long windowInterval;
    private transient long nextTriggerProgress;
    protected transient WindowBuffer windowBuffer;

    public AbstractSliceWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction, WindowBuffer.Factory factory, SliceAssigner sliceAssigner, TypeSerializer<RowData> typeSerializer, int i, ZoneId zoneId) {
        super(generatedNamespaceAggsHandleFunction, sliceAssigner, typeSerializer, sliceAssigner.isEventTime(), i, zoneId);
        this.windowBufferFactory = factory;
        this.sliceAssigner = sliceAssigner;
        this.windowInterval = sliceAssigner.getSliceEndInterval();
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor, org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void open(WindowProcessor.Context<Long> context) throws Exception {
        super.open(context);
        this.windowBuffer = this.windowBufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, this.isEventTime, this.shiftTimeZone);
        this.nextTriggerProgress = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor
    protected WindowTimerService<Long> getWindowTimerService() {
        return new SlicingWindowTimerServiceImpl(this.ctx.getTimerService(), this.shiftTimeZone);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public boolean processElement(RowData rowData, RowData rowData2) throws Exception {
        long assignSliceEnd = this.sliceAssigner.assignSliceEnd(rowData2, this.clockService);
        if (!this.isEventTime) {
            this.windowTimerService.registerProcessingTimeWindowTimer(Long.valueOf(assignSliceEnd));
        }
        if (!this.isEventTime || !TimeWindowUtil.isWindowFired(assignSliceEnd, this.currentProgress, this.shiftTimeZone)) {
            this.windowBuffer.addElement(rowData, assignSliceEnd, rowData2);
            return false;
        }
        if (TimeWindowUtil.isWindowFired(this.sliceAssigner.getLastWindowEnd(assignSliceEnd), this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(rowData, sliceStateMergeTarget(assignSliceEnd), rowData2);
        long j = assignSliceEnd;
        while (true) {
            long j2 = j;
            if (!TimeWindowUtil.isWindowFired(j2, this.currentProgress, this.shiftTimeZone)) {
                this.windowTimerService.registerEventTimeWindowTimer(Long.valueOf(j2));
                return false;
            }
            j = j2 + this.windowInterval;
        }
    }

    protected abstract long sliceStateMergeTarget(long j) throws Exception;

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void advanceProgress(long j) throws Exception {
        if (j > this.currentProgress) {
            this.currentProgress = j;
            if (this.currentProgress >= this.nextTriggerProgress) {
                this.windowBuffer.advanceProgress(this.currentProgress);
                this.nextTriggerProgress = TimeWindowUtil.getNextTriggerWatermark(this.currentProgress, this.windowInterval, this.shiftTimeZone, this.useDayLightSaving);
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void clearWindow(long j, Long l) throws Exception {
        for (Long l2 : this.sliceAssigner.expiredSlices(l.longValue())) {
            this.windowState.clear(l2);
            this.aggregator.cleanup(l2);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor, org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void close() throws Exception {
        super.close();
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }
}
