package org.apache.flink.table.runtime.operators.rank.window.processors;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowMapState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.class */
public final class WindowRankProcessor implements SlicingWindowProcessor<Long> {
    private static final long serialVersionUID = 1;
    private final GeneratedRecordComparator generatedSortKeyComparator;
    private Comparator<RowData> sortKeyComparator;
    private final TypeSerializer<RowData> sortKeySerializer;
    private final WindowBuffer.Factory bufferFactory;
    private final TypeSerializer<RowData> inputSerializer;
    private final long rankStart;
    private final long rankEnd;
    private final boolean outputRankNumber;
    private final int windowEndIndex;
    private final ZoneId shiftTimeZone;
    private transient long currentProgress;
    private transient WindowProcessor.Context<Long> ctx;
    private transient WindowTimerService<Long> windowTimerService;
    private transient WindowBuffer windowBuffer;
    private transient WindowMapState<Long, List<RowData>> windowState;
    private transient JoinedRowData reuseOutput;
    private transient GenericRowData reuseRankRow;

    public WindowRankProcessor(TypeSerializer<RowData> typeSerializer, GeneratedRecordComparator generatedRecordComparator, TypeSerializer<RowData> typeSerializer2, WindowBuffer.Factory factory, long j, long j2, boolean z, int i, ZoneId zoneId) {
        this.inputSerializer = typeSerializer;
        this.generatedSortKeyComparator = generatedRecordComparator;
        this.sortKeySerializer = typeSerializer2;
        this.bufferFactory = factory;
        this.rankStart = j;
        this.rankEnd = j2;
        this.outputRankNumber = z;
        this.windowEndIndex = i;
        this.shiftTimeZone = zoneId;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void open(WindowProcessor.Context<Long> context) throws Exception {
        this.ctx = context;
        this.sortKeyComparator = this.generatedSortKeyComparator.newInstance(this.ctx.getRuntimeContext().getUserCodeClassLoader());
        InternalMapState internalMapState = (MapState) this.ctx.getKeyedStateBackend().getOrCreateKeyedState(LongSerializer.INSTANCE, new MapStateDescriptor("window_rank", this.sortKeySerializer, new ListSerializer(this.inputSerializer)));
        this.windowTimerService = new SlicingWindowTimerServiceImpl(this.ctx.getTimerService(), this.shiftTimeZone);
        this.windowState = new WindowMapState<>(internalMapState);
        this.windowBuffer = this.bufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, true, this.shiftTimeZone);
        this.reuseOutput = new JoinedRowData();
        this.reuseRankRow = new GenericRowData(1);
        this.currentProgress = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void initializeWatermark(long j) {
        this.currentProgress = j;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public boolean processElement(RowData rowData, RowData rowData2) throws Exception {
        long j = rowData2.getLong(this.windowEndIndex);
        if (TimeWindowUtil.isWindowFired(j, this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(rowData, j, rowData2);
        return false;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void advanceProgress(long j) throws Exception {
        if (j > this.currentProgress) {
            this.currentProgress = j;
            this.windowBuffer.advanceProgress(this.currentProgress);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void clearWindow(long j, Long l) throws Exception {
        this.windowState.clear(l);
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void close() throws Exception {
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor
    public void fireWindow(long j, Long l) throws Exception {
        TopNBuffer topNBuffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        Iterator<Map.Entry<RowData, List<RowData>>> it = this.windowState.iterator(l);
        while (it.hasNext()) {
            Map.Entry<RowData, List<RowData>> next = it.next();
            RowData key = next.getKey();
            if (topNBuffer.checkSortKeyInBufferRange(key, this.rankEnd)) {
                topNBuffer.putAll(key, next.getValue());
            }
        }
        Iterator<Map.Entry<RowData, Collection<RowData>>> it2 = topNBuffer.entrySet().iterator();
        long j2 = 1;
        while (it2.hasNext() && j2 <= this.rankEnd) {
            Iterator<RowData> it3 = it2.next().getValue().iterator();
            while (it3.hasNext() && j2 <= this.rankEnd) {
                RowData next2 = it3.next();
                if (j2 >= this.rankStart && j2 <= this.rankEnd) {
                    this.ctx.output(createOutputRow(next2, j2));
                }
                j2 += serialVersionUID;
            }
        }
    }

    private RowData createOutputRow(RowData rowData, long j) {
        if (!this.outputRankNumber) {
            return rowData;
        }
        this.reuseRankRow.setField(0, Long.valueOf(j));
        this.reuseOutput.replace(rowData, this.reuseRankRow);
        this.reuseOutput.setRowKind(RowKind.INSERT);
        return this.reuseOutput;
    }
}
