/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
public abstract class AbstractAsyncStateStreamOperatorV2<OUT>
extends AbstractStreamOperatorV2<OUT>
implements AsyncStateProcessingOperator {
    private final MailboxExecutor mailboxExecutor;
    private AsyncExecutionController asyncExecutionController;
    private RecordContext currentProcessingContext;

    public AbstractAsyncStateStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
        super(parameters, numberOfInputs);
        Environment environment = parameters.getContainingTask().getEnvironment();
        this.mailboxExecutor = environment.getMainMailboxExecutor();
    }

    @Override
    public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        super.initializeState(streamTaskStateManager);
        int inFlightRecordsLimit = this.getExecutionConfig().getAsyncInflightRecordsLimit();
        int asyncBufferSize = this.getExecutionConfig().getAsyncStateBufferSize();
        long asyncBufferTimeout = this.getExecutionConfig().getAsyncStateBufferTimeout();
        int maxParallelism = this.getExecutionConfig().getMaxParallelism();
        this.asyncExecutionController = new AsyncExecutionController(this.mailboxExecutor, null, maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit);
    }

    @Override
    public boolean isAsyncStateProcessingEnabled() {
        return true;
    }

    @Override
    public ElementOrder getElementOrder() {
        return ElementOrder.RECORD_ORDER;
    }

    @Override
    public final <T> void setAsyncKeyedContextElement(StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
        this.currentProcessingContext = this.asyncExecutionController.buildContext(record.getValue(), keySelector.getKey(record.getValue()));
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
    }

    @Override
    public final void postProcessElement() {
        this.currentProcessingContext.release();
    }

    @Override
    public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) {
        this.asyncExecutionController.syncPointRequestWithCallback(processing);
    }

    @Override
    public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(int inputId) {
        throw new UnsupportedOperationException("Never getRecordProcessor from AbstractAsyncStateStreamOperatorV2, since this part is handled by the Input.");
    }

    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception {
        if (this.isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
        return super.snapshotState(checkpointId, timestamp, checkpointOptions, factory);
    }

    @Override
    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        if (!this.isAsyncStateProcessingEnabled()) {
            return super.getInternalTimerService(name, namespaceSerializer, triggerable);
        }
        InternalTimeServiceManager keyedTimeServiceHandler = this.timeServiceManager;
        KeyedStateBackend keyedStateBackend = this.getKeyedStateBackend();
        Preconditions.checkState((keyedStateBackend != null ? 1 : 0) != 0, (Object)"Timers can only be used on keyed operators.");
        return keyedTimeServiceHandler.getAsyncInternalTimerService(name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable, this.asyncExecutionController);
    }

    @VisibleForTesting
    AsyncExecutionController<?> getAsyncExecutionController() {
        return this.asyncExecutionController;
    }

    @VisibleForTesting
    RecordContext getCurrentProcessingContext() {
        return this.currentProcessingContext;
    }
}

