/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers;

import java.io.EOFException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.asyncwindow.buffers.AsyncStateWindowBuffer;
import org.apache.flink.table.runtime.operators.window.async.tvf.combines.AsyncStateRecordsCombiner;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.AsyncStateKeyContext;
import org.apache.flink.table.runtime.operators.window.async.tvf.state.WindowAsyncState;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.AsyncStateUtils;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
import org.apache.flink.util.function.ThrowingRunnable;

public final class AsyncStateRecordsWindowBuffer
implements AsyncStateWindowBuffer {
    private final AsyncStateRecordsCombiner combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final AbstractRowDataSerializer<RowData> recordSerializer;
    private final ZoneId shiftTimeZone;
    private final RecordEqualiser keyEqualiser;
    private final AsyncStateKeyContext keyContext;
    private long minSliceEnd = Long.MAX_VALUE;

    public AsyncStateRecordsWindowBuffer(Object operatorOwner, MemoryManager memoryManager, long memorySize, AsyncStateRecordsCombiner combineFunction, PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, RecordEqualiser keyEqualiser, AsyncStateKeyContext keyContext, ZoneId shiftTimeZone) {
        this.combineFunction = combineFunction;
        this.recordsBuffer = new WindowBytesMultiMap(operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity());
        this.recordSerializer = inputSer;
        this.keyEqualiser = keyEqualiser;
        this.keyContext = keyContext;
        this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
        this.shiftTimeZone = shiftTimeZone;
    }

    @Override
    public StateFuture<Void> addElement(RowData dataKey, long sliceEnd, RowData element) throws Exception {
        StateFuture<Void> resultFuture = AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
        this.minSliceEnd = Math.min(sliceEnd, this.minSliceEnd);
        this.reuseWindowKey.replace(sliceEnd, dataKey);
        BytesMap.LookupInfo lookup = this.recordsBuffer.lookup(this.reuseWindowKey);
        try {
            this.recordsBuffer.append(lookup, this.recordSerializer.toBinaryRow(element));
        }
        catch (EOFException e) {
            resultFuture = this.flush(dataKey);
            this.addElement(dataKey, sliceEnd, element);
        }
        return resultFuture;
    }

    @Override
    public StateFuture<Void> advanceProgress(@Nullable RowData currentKey, long progress) throws Exception {
        if (TimeWindowUtil.isWindowFired(this.minSliceEnd, progress, this.shiftTimeZone)) {
            return this.flush(currentKey);
        }
        return AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
    }

    @Override
    public StateFuture<Void> flush(@Nullable RowData currentKey) throws Exception {
        StateFuture<Void> flushFuture = AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
        if (this.recordsBuffer.getNumKeys() > 0L) {
            KeyValueIterator entryIterator = this.recordsBuffer.getEntryIterator(true);
            while (entryIterator.advanceNext()) {
                WindowKey windowKey = (WindowKey)entryIterator.getKey();
                long window = windowKey.getWindow();
                List<RowData> allData = this.itertorToList(entryIterator.getValue());
                if (currentKey != null && this.keyEqualiser.equals(currentKey, windowKey.getKey())) {
                    flushFuture = this.combineFunction.asyncCombine(window, allData.iterator());
                    continue;
                }
                this.keyContext.asyncProcessWithKey(windowKey.getKey(), (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.combineFunction.asyncCombine(window, allData.iterator())));
            }
            this.recordsBuffer.reset();
            this.minSliceEnd = Long.MAX_VALUE;
        }
        return flushFuture;
    }

    private List<RowData> itertorToList(Iterator<RowData> records) {
        ArrayList<RowData> list = new ArrayList<RowData>();
        while (records.hasNext()) {
            list.add(records.next());
        }
        return list;
    }

    @Override
    public void close() throws Exception {
        this.recordsBuffer.free();
        this.combineFunction.close();
    }

    public static final class Factory
    implements AsyncStateWindowBuffer.Factory {
        private static final long serialVersionUID = 1L;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final AsyncStateRecordsCombiner.Factory factory;
        private final GeneratedRecordEqualiser generatedKeyEqualiser;

        public Factory(PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, AsyncStateRecordsCombiner.Factory combinerFactory, GeneratedRecordEqualiser generatedKeyEqualiser) {
            this.keySer = keySer;
            this.inputSer = inputSer;
            this.factory = combinerFactory;
            this.generatedKeyEqualiser = generatedKeyEqualiser;
        }

        @Override
        public AsyncStateWindowBuffer create(Object operatorOwner, MemoryManager memoryManager, long memorySize, RuntimeContext runtimeContext, WindowTimerService<Long> timerService, AsyncStateKeyContext keyContext, WindowAsyncState<Long> windowState, boolean isEventTime, ZoneId shiftTimeZone) throws Exception {
            AsyncStateRecordsCombiner combiner = this.factory.createRecordsCombiner(runtimeContext, timerService, windowState, isEventTime);
            RecordEqualiser keyEqualiser = (RecordEqualiser)this.generatedKeyEqualiser.newInstance(runtimeContext.getUserCodeClassLoader());
            return new AsyncStateRecordsWindowBuffer(operatorOwner, memoryManager, memorySize, combiner, this.keySer, this.inputSer, keyEqualiser, keyContext, shiftTimeZone);
        }
    }
}

