/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
public class BatchExecutionInternalTimeServiceWithAsyncState<K, N>
extends BatchExecutionInternalTimeService<K, N> {
    private AsyncExecutionController<K> asyncExecutionController;

    BatchExecutionInternalTimeServiceWithAsyncState(ProcessingTimeService processingTimeService, Triggerable<K, N> triggerTarget) {
        super(processingTimeService, triggerTarget);
    }

    public void setup(AsyncExecutionController<K> asyncExecutionController) {
        if (asyncExecutionController != null) {
            this.asyncExecutionController = asyncExecutionController;
        }
    }

    @Override
    public void setCurrentKey(K currentKey) throws Exception {
        InternalTimer timerToTrigger;
        InternalTimer timer;
        if (currentKey != null && currentKey.equals(this.currentKey)) {
            return;
        }
        this.currentWatermark = Long.MAX_VALUE;
        while ((timer = (InternalTimer)this.eventTimeTimersQueue.poll()) != null) {
            timerToTrigger = timer;
            this.maintainContextAndProcess(timerToTrigger, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.triggerTarget.onEventTime(timerToTrigger)));
        }
        while ((timer = (InternalTimer)this.processingTimeTimersQueue.poll()) != null) {
            timerToTrigger = timer;
            this.maintainContextAndProcess(timerToTrigger, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.triggerTarget.onProcessingTime(timerToTrigger)));
        }
        this.currentWatermark = Long.MIN_VALUE;
        this.currentKey = currentKey;
    }

    private void maintainContextAndProcess(InternalTimer<K, N> timer, ThrowingRunnable<Exception> runnable) {
        RecordContext<K> previousContext = this.asyncExecutionController.getCurrentContext();
        RecordContext<K> recordCtx = this.asyncExecutionController.buildContext(timer, timer.getKey());
        recordCtx.retain();
        this.asyncExecutionController.setCurrentContext(recordCtx);
        this.asyncExecutionController.syncPointRequestWithCallback(runnable, true);
        recordCtx.release();
        this.asyncExecutionController.setCurrentContext(previousContext);
    }
}

