/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncException;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessing;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class AbstractAsyncKeyOrderedStreamOperator<OUT>
extends AbstractStreamOperator<OUT>
implements AsyncKeyOrderedProcessingOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncKeyOrderedStreamOperator.class);
    protected AsyncExecutionController asyncExecutionController;
    protected RecordContext currentProcessingContext;
    protected Environment environment;
    protected DeclarationManager declarationManager;

    @Override
    public final void beforeInitializeStateHandler() {
        KeyedStateStore stateStore = this.stateHandler.getKeyedStateStore().orElse(null);
        if (stateStore instanceof DefaultKeyedStateStore) {
            ((DefaultKeyedStateStore)stateStore).setSupportKeyedStateApiSetV2();
        }
        StreamTask containingTask = (StreamTask)Preconditions.checkNotNull(this.getContainingTask());
        this.environment = containingTask.getEnvironment();
        this.declarationManager = new DeclarationManager();
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            this.asyncExecutionController = this.createAsyncExecutionController();
        }
    }

    protected abstract KeySelector getKeySelectorForAsyncKeyedContext(int var1);

    protected abstract AsyncExecutionController createAsyncExecutionController();

    protected void handleAsyncException(String message, Throwable exception) {
        this.environment.failExternally(new AsyncException(message, exception));
    }

    @Override
    public boolean isAsyncKeyOrderedProcessingEnabled() {
        return true;
    }

    @Override
    public ElementOrder getElementOrder() {
        return ElementOrder.RECORD_ORDER;
    }

    @Override
    public final <T> void setAsyncKeyedContextElement(StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
        this.currentProcessingContext = this.asyncExecutionController.buildContext(record.getValue(), keySelector.getKey(record.getValue()));
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
        this.newKeySelected(this.currentProcessingContext.getKey());
    }

    public void newKeySelected(Object newKey) {
    }

    @Override
    public final void postProcessElement() {
        this.currentProcessingContext.release();
    }

    @Override
    public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) {
        this.asyncExecutionController.syncPointRequestWithCallback(processing, false);
    }

    @Override
    public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) {
        RecordContext oldContext = this.asyncExecutionController.getCurrentContext();
        RecordContext<K> newContext = this.asyncExecutionController.buildContext(null, key, true);
        newContext.retain();
        this.asyncExecutionController.setCurrentContext(newContext);
        this.asyncExecutionController.syncPointRequestWithCallback(processing, true);
        newContext.release();
        this.asyncExecutionController.setCurrentContext(oldContext);
    }

    @Override
    public final DeclarationManager getDeclarationManager() {
        return this.declarationManager;
    }

    @Override
    public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(int inputId) {
        if (this instanceof TwoInputStreamOperator) {
            switch (inputId) {
                case 1: {
                    return AsyncKeyOrderedProcessing.makeRecordProcessor(this, this.getKeySelectorForAsyncKeyedContext(1), ((TwoInputStreamOperator)((Object)this))::processElement1);
                }
                case 2: {
                    return AsyncKeyOrderedProcessing.makeRecordProcessor(this, this.getKeySelectorForAsyncKeyedContext(2), ((TwoInputStreamOperator)((Object)this))::processElement2);
                }
            }
        } else if (this instanceof Input && inputId == 1) {
            return AsyncKeyOrderedProcessing.makeRecordProcessor(this, this.getKeySelectorForAsyncKeyedContext(1), ((Input)((Object)this))::processElement);
        }
        throw new IllegalArgumentException(String.format("Unsupported operator type %s with input id %d", this.getClass().getName(), inputId));
    }

    protected void processNonRecord(@Nullable ThrowingRunnable<? extends Exception> triggerAction, @Nullable ThrowingRunnable<? extends Exception> finalAction) {
        this.asyncExecutionController.processNonRecord(triggerAction, finalAction);
    }

    public <N, S extends State, T> S getOrCreateKeyedState(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<T> stateDescriptor) throws Exception {
        return this.stateHandler.getOrCreateKeyedState(defaultNamespace, namespaceSerializer, stateDescriptor);
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    @Override
    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            return super.getInternalTimerService(name, namespaceSerializer, triggerable);
        }
        InternalTimeServiceManager keyedTimeServiceHandler = this.timeServiceManager;
        TypeSerializer keySerializer = this.stateHandler.getKeySerializer();
        Preconditions.checkState((keySerializer != null ? 1 : 0) != 0, (Object)"Timers can only be used on keyed operators.");
        InternalTimerService<N> service = keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);
        if (service instanceof InternalTimerServiceAsyncImpl) {
            ((InternalTimerServiceAsyncImpl)service).setup(this.asyncExecutionController);
        } else if (service instanceof BatchExecutionInternalTimeServiceWithAsyncState) {
            ((BatchExecutionInternalTimeServiceWithAsyncState)service).setup(this.asyncExecutionController);
        }
        return service;
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        super.setKeyContextElement1(record);
        if (this.stateKeySelector1 != null) {
            this.newKeySelected(this.getCurrentKey());
        }
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        super.setKeyContextElement2(record);
        if (this.stateKeySelector2 != null) {
            this.newKeySelected(this.getCurrentKey());
        }
    }

    @Override
    public Object getCurrentKey() {
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            RecordContext currentContext = this.asyncExecutionController.getCurrentContext();
            if (currentContext == null) {
                throw new UnsupportedOperationException("Have not set the current key yet, this may because the operator has not started to run, or you are invoking this under a non-keyed context.");
            }
            return currentContext.getKey();
        }
        return super.getCurrentKey();
    }

    @Override
    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.reportOrForwardLatencyMarker(marker);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.reportOrForwardLatencyMarker(marker)));
    }

    public Watermark preProcessWatermark(Watermark watermark) throws Exception {
        return watermark;
    }

    public Watermark postProcessWatermark(Watermark watermark) throws Exception {
        return watermark;
    }

    @Override
    public final void processWatermark(Watermark mark) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            Watermark watermark = this.preProcessWatermark(mark);
            if (watermark != null) {
                super.processWatermark(watermark);
                this.postProcessWatermark(watermark);
            }
            return;
        }
        AtomicReference<Object> watermarkRef = new AtomicReference<Object>(null);
        this.processNonRecord((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            watermarkRef.set(this.preProcessWatermark(mark));
            if (this.timeServiceManager != null && watermarkRef.get() != null) {
                this.timeServiceManager.advanceWatermark((Watermark)watermarkRef.get());
            }
        }), (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            Watermark postProcessWatermark;
            if (watermarkRef.get() != null && (postProcessWatermark = this.postProcessWatermark((Watermark)watermarkRef.get())) != null) {
                this.output.emitWatermark(postProcessWatermark);
            }
        }));
    }

    @Override
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.processWatermarkStatus(watermarkStatus);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processWatermarkStatus(watermarkStatus)));
    }

    @Override
    protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.processWatermarkStatus(watermarkStatus, index);
            return;
        }
        AtomicBoolean wasIdle = new AtomicBoolean(false);
        AtomicReference<Object> watermarkRef = new AtomicReference<Object>(null);
        this.processNonRecord((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            wasIdle.set(this.combinedWatermark.isIdle());
            if (this.combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
                watermarkRef.set(this.preProcessWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark())));
                if (this.timeServiceManager != null && watermarkRef.get() != null) {
                    this.timeServiceManager.advanceWatermark((Watermark)watermarkRef.get());
                }
            }
        }), (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            if (watermarkRef.get() != null) {
                this.output.emitWatermark((Watermark)watermarkRef.get());
                this.postProcessWatermark((Watermark)watermarkRef.get());
            }
            if (wasIdle.get() != this.combinedWatermark.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
        }));
    }

    @Override
    @Experimental
    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.processRecordAttributes(recordAttributes);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processRecordAttributes(recordAttributes)));
    }

    @Override
    @Experimental
    public void processRecordAttributes1(RecordAttributes recordAttributes) {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.processRecordAttributes1(recordAttributes);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processRecordAttributes1(recordAttributes)));
    }

    @Override
    @Experimental
    public void processRecordAttributes2(RecordAttributes recordAttributes) {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            super.processRecordAttributes2(recordAttributes);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processRecordAttributes2(recordAttributes)));
    }

    public void processWatermarkInternal(WatermarkEvent watermark) throws Exception {
        super.processWatermark(watermark);
    }

    public void processWatermark1Internal(WatermarkEvent watermark) throws Exception {
        super.processWatermark1(watermark);
    }

    public void processWatermark2Internal(WatermarkEvent watermark) throws Exception {
        super.processWatermark2(watermark);
    }

    @Override
    public void processWatermark(WatermarkEvent watermark) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            this.processWatermarkInternal(watermark);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.processWatermarkInternal(watermark)));
    }

    @Override
    public void processWatermark1(WatermarkEvent watermark) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            this.processWatermark1Internal(watermark);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.processWatermark1Internal(watermark)));
    }

    @Override
    public void processWatermark2(WatermarkEvent watermark) throws Exception {
        if (!this.isAsyncKeyOrderedProcessingEnabled()) {
            this.processWatermark2Internal(watermark);
            return;
        }
        this.processNonRecord(null, (ThrowingRunnable<Exception>)((ThrowingRunnable)() -> this.processWatermark2Internal(watermark)));
    }

    @VisibleForTesting
    AsyncExecutionController getAsyncExecutionController() {
        return this.asyncExecutionController;
    }

    @VisibleForTesting
    RecordContext getCurrentProcessingContext() {
        return this.currentProcessingContext;
    }

    public <K> AsyncKeyedStateBackend<K> getAsyncKeyedStateBackend() {
        return this.stateHandler.getAsyncKeyedStateBackend();
    }

    public void drainStateRequests() {
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        this.closeIfNeeded();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.closeIfNeeded();
    }

    private void closeIfNeeded() {
        if (this.isAsyncKeyOrderedProcessingEnabled() && !this.getContainingTask().isFailing() && !this.getContainingTask().isCanceled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }
}

