/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateFutureFactory;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestBuffer;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.v2.internal.InternalPartitionedState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncExecutionController<K>
implements StateRequestHandler,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
    private static final long DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL = 100L;
    private final int batchSize;
    private final CallbackRunnerWrapper callbackRunner;
    private final long bufferTimeout;
    private final long bufferTimeoutCheckInterval = 100L;
    private final int maxInFlightRecordNum;
    private final MailboxExecutor mailboxExecutor;
    private final StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler;
    final KeyAccountingUnit<K> keyAccountingUnit;
    private final StateFutureFactory<K> stateFutureFactory;
    private final StateExecutor stateExecutor;
    private final DeclarationManager declarationManager;
    RecordContext<K> currentContext;
    StateRequestBuffer<K> stateRequestsBuffer;
    final AtomicInteger inFlightRecordNum;
    private final int maxParallelism;
    final EpochManager epochManager;
    final SwitchContextListener<K> switchContextListener;
    final EpochManager.ParallelMode epochParallelMode = EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH;
    private final Object notifyLock = new Object();
    private volatile boolean waitingMail = false;
    private int drainDepth = 0;

    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler, StateExecutor stateExecutor, DeclarationManager declarationManager, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords, @Nullable SwitchContextListener<K> switchContextListener, @Nullable MetricGroup metricGroup) {
        this.keyAccountingUnit = new KeyAccountingUnit(maxInFlightRecords);
        this.mailboxExecutor = mailboxExecutor;
        this.exceptionHandler = exceptionHandler;
        this.callbackRunner = new CallbackRunnerWrapper(mailboxExecutor, this::notifyNewMail);
        this.stateFutureFactory = new StateFutureFactory(this, this.callbackRunner, exceptionHandler);
        this.stateExecutor = stateExecutor;
        this.declarationManager = declarationManager;
        this.batchSize = batchSize;
        this.bufferTimeout = bufferTimeout;
        this.maxInFlightRecordNum = maxInFlightRecords;
        this.inFlightRecordNum = new AtomicInteger(0);
        this.maxParallelism = maxParallelism;
        this.stateRequestsBuffer = new StateRequestBuffer(bufferTimeout, 100L, scheduledSeq -> mailboxExecutor.execute(() -> {
            if (this.stateRequestsBuffer.checkCurrentSeq((long)scheduledSeq)) {
                this.triggerIfNeeded(true);
            }
        }, "AEC-buffer-timeout"));
        this.epochManager = new EpochManager(this);
        this.switchContextListener = switchContextListener;
        if (metricGroup != null) {
            metricGroup.gauge("numInFlightRecords", this::getInFlightRecordNum);
            metricGroup.gauge("activeBufferSize", () -> this.stateRequestsBuffer.activeQueueSize());
            metricGroup.gauge("blockingBufferSize", () -> this.stateRequestsBuffer.blockingQueueSize());
            metricGroup.gauge("numBlockingKeys", () -> this.stateRequestsBuffer.blockingKeyNum());
        }
        LOG.info("Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", new Object[]{this.batchSize, this.bufferTimeout, this.maxInFlightRecordNum, this.epochParallelMode});
    }

    public RecordContext<K> buildContext(Object record, K key) {
        return this.buildContext(record, key, false);
    }

    public RecordContext<K> buildContext(Object record, K key, boolean inherit) {
        if (inherit && this.currentContext != null) {
            return new RecordContext<K>(record == null ? RecordContext.EMPTY_RECORD : record, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, this.maxParallelism), this.epochManager.onEpoch(this.currentContext.getEpoch()), this.currentContext.getVariablesReference(), this.currentContext.getPriority() + 1);
        }
        return new RecordContext<K>(record == null ? RecordContext.EMPTY_RECORD : record, key, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(key, this.maxParallelism), this.epochManager.onRecord(), this.declarationManager.variableCount());
    }

    public void setCurrentContext(RecordContext<K> switchingContext) {
        if (this.currentContext != switchingContext) {
            this.currentContext = switchingContext;
            this.declarationManager.setCurrentContext(switchingContext);
            if (this.switchContextListener != null) {
                this.switchContextListener.switchContext(switchingContext);
            }
        }
    }

    public RecordContext<K> getCurrentContext() {
        return this.currentContext;
    }

    void disposeContext(RecordContext<K> toDispose) {
        this.epochManager.completeOneRecord(toDispose.getEpoch());
        this.keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
        this.inFlightRecordNum.decrementAndGet();
        StateRequest<K, ?, ?, ?> nextRequest = this.stateRequestsBuffer.unblockOneByKey(toDispose.getKey());
        if (nextRequest != null) {
            Preconditions.checkState(this.tryOccupyKey(nextRequest.getRecordContext()));
            this.insertActiveBuffer(nextRequest);
        }
    }

    boolean tryOccupyKey(RecordContext<K> recordContext) {
        boolean occupied = recordContext.isKeyOccupied();
        if (!occupied && this.keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
            recordContext.setKeyOccupied();
            occupied = true;
        }
        return occupied;
    }

    @Override
    public <IN, OUT> InternalStateFuture<OUT> handleRequest(@Nullable State state, StateRequestType type, @Nullable IN payload) {
        return this.handleRequest(state, type, false, payload, false);
    }

    public <IN, OUT> InternalStateFuture<OUT> handleRequest(@Nullable State state, StateRequestType type, boolean sync, @Nullable IN payload, boolean allowOverdraft) {
        InternalStateFuture stateFuture = this.stateFutureFactory.create(this.currentContext);
        StateRequest request = new StateRequest(state, type, sync || type == StateRequestType.SYNC_POINT, payload, stateFuture, this.currentContext);
        this.seizeCapacity(allowOverdraft);
        if (this.tryOccupyKey(this.currentContext)) {
            this.insertActiveBuffer(request);
        } else {
            this.insertBlockingBuffer(request);
        }
        this.triggerIfNeeded(false);
        return stateFuture;
    }

    @Override
    public <IN, OUT> OUT handleRequestSync(State state, StateRequestType type, @Nullable IN payload) {
        InternalStateFuture<OUT> stateFuture = this.handleRequest(state, type, true, payload, false);
        if (!stateFuture.isDone()) {
            this.triggerIfNeeded(true);
            try {
                while (!stateFuture.isDone()) {
                    if (this.mailboxExecutor.tryYield()) continue;
                    if (!this.stateExecutor.fullyLoaded()) {
                        this.triggerIfNeeded(true);
                    }
                    this.waitForNewMails();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        return stateFuture.get();
    }

    @Override
    public <N> void setCurrentNamespaceForState(@Nonnull InternalPartitionedState<N> state, N namespace) {
        this.currentContext.setNamespace(state, namespace);
    }

    <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) {
        if (request.isSync()) {
            if (request.getRequestType() == StateRequestType.SYNC_POINT) {
                request.getFuture().complete(null);
            } else {
                this.stateExecutor.executeRequestSync(request);
            }
        } else {
            this.stateRequestsBuffer.enqueueToActive(request);
        }
    }

    <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) {
        this.stateRequestsBuffer.enqueueToBlocking(request);
    }

    public boolean triggerIfNeeded(boolean force) {
        if (!force && this.stateRequestsBuffer.activeQueueSize() < this.batchSize) {
            return false;
        }
        Optional<StateRequestContainer> toRun = this.stateRequestsBuffer.popActive(this.batchSize, () -> this.stateExecutor.createStateRequestContainer());
        if (!toRun.isPresent() || toRun.get().isEmpty()) {
            return false;
        }
        this.stateExecutor.executeBatchRequests(toRun.get());
        this.stateRequestsBuffer.advanceSeq();
        return true;
    }

    private void seizeCapacity(boolean allowOverdraft) {
        if (this.currentContext.isKeyOccupied()) {
            return;
        }
        this.drainInflightRecords(this.maxInFlightRecordNum, !allowOverdraft);
        this.inFlightRecordNum.incrementAndGet();
    }

    public StateFuture<Void> syncPointRequestWithCallback(ThrowingRunnable<Exception> callback, boolean allowOverdraft) {
        return this.handleRequest(null, StateRequestType.SYNC_POINT, true, null, allowOverdraft).thenAccept(v -> callback.run());
    }

    public void drainInflightRecords(int targetNum) {
        this.drainInflightRecords(targetNum, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainInflightRecords(int targetNum, boolean forceToWait) {
        if (!forceToWait && this.drainDepth > 5) {
            return;
        }
        RecordContext<K> storedContext = this.currentContext;
        ++this.drainDepth;
        try {
            boolean shouldWait = true;
            while (shouldWait && this.inFlightRecordNum.get() > targetNum) {
                if (this.mailboxExecutor.tryYield()) continue;
                boolean triggered = false;
                if (targetNum == 0 || !this.stateExecutor.fullyLoaded()) {
                    triggered = this.triggerIfNeeded(true);
                }
                if (!(forceToWait || triggered || this.stateExecutor.fullyLoaded() || this.callbackRunner.isHasMail())) {
                    shouldWait = false;
                    continue;
                }
                this.waitForNewMails();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            --this.drainDepth;
            this.setCurrentContext(storedContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForNewMails() throws InterruptedException {
        if (!this.callbackRunner.isHasMail()) {
            Object object = this.notifyLock;
            synchronized (object) {
                if (!this.callbackRunner.isHasMail()) {
                    this.waitingMail = true;
                    this.notifyLock.wait(1L);
                    this.waitingMail = false;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyNewMail() {
        if (this.waitingMail) {
            Object object = this.notifyLock;
            synchronized (object) {
                if (this.waitingMail) {
                    this.notifyLock.notify();
                }
            }
        }
    }

    public void processNonRecord(@Nullable ThrowingRunnable<? extends Exception> triggerAction, @Nullable ThrowingRunnable<? extends Exception> finalAction) {
        this.epochManager.onNonRecord(triggerAction == null ? null : () -> {
            try {
                RecordContext<K> previousContext = this.currentContext;
                this.setCurrentContext(null);
                triggerAction.run();
                this.setCurrentContext(previousContext);
            }
            catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, finalAction == null ? null : () -> {
            try {
                RecordContext<K> previousContext = this.currentContext;
                this.setCurrentContext(null);
                finalAction.run();
                this.setCurrentContext(previousContext);
            }
            catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, this.epochParallelMode);
    }

    @VisibleForTesting
    public StateExecutor getStateExecutor() {
        return this.stateExecutor;
    }

    @VisibleForTesting
    public int getInFlightRecordNum() {
        return this.inFlightRecordNum.get();
    }

    @Override
    public void close() throws IOException {
        this.stateRequestsBuffer.close();
    }

    public static interface SwitchContextListener<K> {
        public void switchContext(@Nullable RecordContext<K> var1);
    }
}

