package org.apache.flink.table.runtime.operators.rank.window.combines;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowMapState;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowState;
import org.apache.flink.table.runtime.util.WindowKey;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner.class */
public final class TopNRecordsCombiner implements RecordsCombiner {
    private final WindowTimerService<Long> timerService;
    private final StateKeyContext keyContext;
    private final WindowMapState<Long, List<RowData>> dataState;
    private final Comparator<RowData> sortKeyComparator;
    private final KeySelector<RowData, RowData> sortKeySelector;
    private final long topN;
    private final TypeSerializer<RowData> recordSerializer;
    private final boolean isEventTime;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/window/combines/TopNRecordsCombiner$Factory.class */
    public static final class Factory implements RecordsCombiner.Factory {
        private static final long serialVersionUID = 1;
        private final GeneratedRecordComparator generatedSortKeyComparator;
        private final KeySelector<RowData, RowData> sortKeySelector;
        private final TypeSerializer<RowData> recordSerializer;
        private final long topN;

        public Factory(GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, TypeSerializer<RowData> typeSerializer, long j) {
            this.generatedSortKeyComparator = generatedRecordComparator;
            this.sortKeySelector = rowDataKeySelector;
            this.recordSerializer = typeSerializer;
            this.topN = j;
        }

        @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner.Factory
        public RecordsCombiner createRecordsCombiner(RuntimeContext runtimeContext, WindowTimerService<Long> windowTimerService, KeyedStateBackend<RowData> keyedStateBackend, WindowState<Long> windowState, boolean z) throws Exception {
            RecordComparator newInstance = this.generatedSortKeyComparator.newInstance(runtimeContext.getUserCodeClassLoader());
            Objects.requireNonNull(keyedStateBackend);
            return new TopNRecordsCombiner(windowTimerService, (v1) -> {
                r3.setCurrentKey(v1);
            }, (WindowMapState) windowState, newInstance, this.sortKeySelector, this.topN, this.recordSerializer, z);
        }
    }

    public TopNRecordsCombiner(WindowTimerService<Long> windowTimerService, StateKeyContext stateKeyContext, WindowMapState<Long, List<RowData>> windowMapState, Comparator<RowData> comparator, KeySelector<RowData, RowData> keySelector, long j, TypeSerializer<RowData> typeSerializer, boolean z) {
        this.timerService = windowTimerService;
        this.keyContext = stateKeyContext;
        this.dataState = windowMapState;
        this.sortKeyComparator = comparator;
        this.sortKeySelector = keySelector;
        this.topN = j;
        this.recordSerializer = typeSerializer;
        this.isEventTime = z;
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner
    public void combine(WindowKey windowKey, Iterator<RowData> it) throws Exception {
        TopNBuffer topNBuffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        while (it.hasNext()) {
            RowData next = it.next();
            if (!RowDataUtil.isAccumulateMsg(next)) {
                throw new UnsupportedOperationException("Window rank does not support input RowKind: " + next.getRowKind().shortString());
            }
            RowData rowData = (RowData) this.sortKeySelector.getKey(next);
            if (topNBuffer.checkSortKeyInBufferRange(rowData, this.topN)) {
                topNBuffer.put(rowData, (RowData) this.recordSerializer.copy(next));
            }
        }
        this.keyContext.setCurrentKey(windowKey.getKey());
        Long valueOf = Long.valueOf(windowKey.getWindow());
        for (Map.Entry<RowData, Collection<RowData>> entry : topNBuffer.entrySet()) {
            RowData key = entry.getKey();
            List<RowData> list = this.dataState.get(valueOf, key);
            if (list == null) {
                list = new ArrayList();
            }
            list.addAll(entry.getValue());
            this.dataState.put(valueOf, key, list);
        }
        if (this.isEventTime) {
            this.timerService.registerEventTimeWindowTimer(valueOf);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner
    public void close() throws Exception {
    }
}
