package org.apache.flink.table.runtime.operators.aggregate.window.buffers;

import java.io.EOFException;
import java.time.ZoneId;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.tvf.state.WindowState;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.class */
public final class RecordsWindowBuffer implements WindowBuffer {
    private final RecordsCombiner combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final AbstractRowDataSerializer<RowData> recordSerializer;
    private final ZoneId shiftTimeZone;
    private final boolean requiresCopy;
    private long minSliceEnd = Long.MAX_VALUE;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer$Factory.class */
    public static final class Factory implements WindowBuffer.Factory {
        private static final long serialVersionUID = 1;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final RecordsCombiner.Factory factory;

        public Factory(PagedTypeSerializer<RowData> pagedTypeSerializer, AbstractRowDataSerializer<RowData> abstractRowDataSerializer, RecordsCombiner.Factory factory) {
            this.keySer = pagedTypeSerializer;
            this.inputSer = abstractRowDataSerializer;
            this.factory = factory;
        }

        @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer.Factory
        public WindowBuffer create(Object obj, MemoryManager memoryManager, long j, RuntimeContext runtimeContext, WindowTimerService<Long> windowTimerService, KeyedStateBackend<RowData> keyedStateBackend, WindowState<Long> windowState, boolean z, ZoneId zoneId) throws Exception {
            return new RecordsWindowBuffer(obj, memoryManager, j, this.factory.createRecordsCombiner(runtimeContext, windowTimerService, keyedStateBackend, windowState, z), this.keySer, this.inputSer, !keyedStateBackend.isSafeToReuseKVState(), zoneId);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer$LocalFactory.class */
    public static final class LocalFactory implements WindowBuffer.LocalFactory {
        private static final long serialVersionUID = 1;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final RecordsCombiner.LocalFactory localFactory;

        public LocalFactory(PagedTypeSerializer<RowData> pagedTypeSerializer, AbstractRowDataSerializer<RowData> abstractRowDataSerializer, RecordsCombiner.LocalFactory localFactory) {
            this.keySer = pagedTypeSerializer;
            this.inputSer = abstractRowDataSerializer;
            this.localFactory = localFactory;
        }

        @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer.LocalFactory
        public WindowBuffer create(Object obj, MemoryManager memoryManager, long j, RuntimeContext runtimeContext, Collector<RowData> collector, ZoneId zoneId) throws Exception {
            return new RecordsWindowBuffer(obj, memoryManager, j, this.localFactory.createRecordsCombiner(runtimeContext, collector), this.keySer, this.inputSer, false, zoneId);
        }
    }

    public RecordsWindowBuffer(Object obj, MemoryManager memoryManager, long j, RecordsCombiner recordsCombiner, PagedTypeSerializer<RowData> pagedTypeSerializer, AbstractRowDataSerializer<RowData> abstractRowDataSerializer, boolean z, ZoneId zoneId) {
        this.combineFunction = recordsCombiner;
        this.recordsBuffer = new WindowBytesMultiMap(obj, memoryManager, j, pagedTypeSerializer, abstractRowDataSerializer.getArity());
        this.recordSerializer = abstractRowDataSerializer;
        this.reuseWindowKey = new WindowKeySerializer(pagedTypeSerializer).m198createInstance();
        this.requiresCopy = z;
        this.shiftTimeZone = zoneId;
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void addElement(RowData rowData, long j, RowData rowData2) throws Exception {
        this.minSliceEnd = Math.min(j, this.minSliceEnd);
        this.reuseWindowKey.replace(j, rowData);
        try {
            this.recordsBuffer.append(this.recordsBuffer.lookup(this.reuseWindowKey), this.recordSerializer.toBinaryRow(rowData2));
        } catch (EOFException e) {
            flush();
            addElement(rowData, j, rowData2);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void advanceProgress(long j) throws Exception {
        if (TimeWindowUtil.isWindowFired(this.minSliceEnd, j, this.shiftTimeZone)) {
            flush();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void flush() throws Exception {
        if (this.recordsBuffer.getNumKeys() > 0) {
            KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator = this.recordsBuffer.getEntryIterator(this.requiresCopy);
            while (entryIterator.advanceNext()) {
                this.combineFunction.combine(entryIterator.getKey(), entryIterator.getValue());
            }
            this.recordsBuffer.reset();
            this.minSliceEnd = Long.MAX_VALUE;
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void close() throws Exception {
        this.recordsBuffer.free();
        this.combineFunction.close();
    }
}
