/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.configuration.TimerServiceOptions;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements StatefulTask,
AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final Object lock = new Object();
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    private StreamConfig configuration;
    private StateBackend stateBackend;
    private AbstractKeyedStateBackend<?> keyedStateBackend;
    private ProcessingTimeService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private TaskStateSnapshot taskStateSnapshot;
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    @VisibleForTesting
    public void setProcessingTimeService(ProcessingTimeService timeProvider) {
        if (timeProvider == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = timeProvider;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void invoke() throws Exception {
        boolean disposed = false;
        try {
            Object timerThreadFactory;
            LOG.debug("Initializing {}.", (Object)this.getName());
            this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
            this.configuration = new StreamConfig(this.getTaskConfiguration());
            this.stateBackend = this.createStateBackend();
            this.accumulatorMap = this.getEnvironment().getAccumulatorRegistry().getUserMap();
            if (this.timerService == null) {
                timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName());
                this.timerService = new SystemProcessingTimeService(this, this.getCheckpointLock(), (ThreadFactory)timerThreadFactory);
            }
            this.operatorChain = new OperatorChain(this);
            this.headOperator = this.operatorChain.getHeadOperator();
            this.init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", (Object)this.getName());
            timerThreadFactory = this.lock;
            synchronized (timerThreadFactory) {
                this.initializeState();
                this.openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            this.run();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Finished task {}", (Object)this.getName());
            timerThreadFactory = this.lock;
            synchronized (timerThreadFactory) {
                this.closeAllOperators();
                this.timerService.quiesce();
                this.isRunning = false;
            }
            this.timerService.awaitPendingAfterQuiesce();
            LOG.debug("Closed operators for task {}", (Object)this.getName());
            this.operatorChain.flushOutputs();
            this.tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            this.isRunning = false;
            if (this.timerService != null && !this.timerService.isTerminated()) {
                try {
                    long timeoutMs = this.getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
                    boolean timerShutdownComplete = this.timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
                    if (!timerShutdownComplete) {
                        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", (Object)timeoutMs);
                    }
                }
                catch (Throwable t) {
                    LOG.error("Could not shut down timer service", t);
                }
            }
            try {
                this.cancelables.close();
                this.shutdownAsyncThreads();
            }
            catch (Throwable t) {
                LOG.error("Could not shut down async checkpoint threads", t);
            }
            try {
                this.cleanup();
            }
            catch (Throwable t) {
                LOG.error("Error during cleanup of stream task", t);
            }
            if (!disposed) {
                this.disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            this.cancelTask();
        }
        finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = allOperators.length - 1; i >= 0; --i) {
            StreamOperator<?> operator = allOperators[i];
            if (operator == null) continue;
            operator.close();
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.dispose();
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (!this.asyncOperationsThreadPool.isShutdown()) {
            this.asyncOperationsThreadPool.shutdownNow();
        }
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                try {
                    if (operator == null) continue;
                    operator.dispose();
                }
                catch (Throwable t) {
                    LOG.error("Error during disposal of stream operator.", t);
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    boolean isSerializingTimestamps() {
        TimeCharacteristic tc = this.configuration.getTimeCharacteristic();
        return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
    }

    public String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(TaskStateSnapshot taskStateHandles) {
        this.taskStateSnapshot = taskStateHandles;
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);
            return this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{checkpointMetaData.getCheckpointId(), this.getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        try {
            this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", (Object)this.getName(), (Object)checkpointMetaData.getCheckpointId());
            throw e;
        }
        catch (Exception e) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", (Object)checkpointId, (Object)this.getName());
        this.getEnvironment().declineCheckpoint(checkpointId, cause);
        Object object = this.lock;
        synchronized (object) {
            this.operatorChain.broadcastCheckpointCancelMarker(checkpointId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), this.getName()});
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                this.operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
                this.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exception = null;
            for (ResultPartitionWriter output : this.getEnvironment().getAllWriters()) {
                try {
                    output.writeBufferToAllChannels(EventSerializer.toBuffer((AbstractEvent)message));
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception);
                }
            }
            if (exception != null) {
                throw exception;
            }
            return false;
        }
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", (Object)this.getName());
                for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                    if (operator == null) continue;
                    operator.notifyOfCompletedCheckpoint(checkpointId);
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", (Object)this.getName());
            }
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointOptions, checkpointMetrics);
        checkpointingOperation.executeCheckpointing();
    }

    private void initializeState() throws Exception {
        boolean restored;
        boolean bl = restored = null != this.taskStateSnapshot;
        if (restored) {
            this.initializeOperators(true);
            this.taskStateSnapshot = null;
        } else {
            this.initializeOperators(false);
        }
    }

    private void initializeOperators(boolean restored) throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int chainIdx = 0; chainIdx < allOperators.length; ++chainIdx) {
            StreamOperator<?> operator = allOperators[chainIdx];
            if (null == operator) continue;
            if (restored && this.taskStateSnapshot != null) {
                operator.initializeState(this.taskStateSnapshot.getSubtaskStateByOperatorID(operator.getOperatorID()));
                continue;
            }
            operator.initializeState(null);
        }
    }

    private StateBackend createStateBackend() throws Exception {
        AbstractStateBackend fromJob = this.configuration.getStateBackend(this.getUserCodeClassLoader());
        if (fromJob != null) {
            LOG.info("Using user-defined state backend: {}.", (Object)fromJob);
            return fromJob;
        }
        return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault((Configuration)this.getEnvironment().getTaskManagerInfo().getConfiguration(), (ClassLoader)this.getUserCodeClassLoader(), (Logger)LOG);
    }

    public OperatorStateBackend createOperatorStateBackend(StreamOperator<?> op, Collection<OperatorStateHandle> restoreStateHandles) throws Exception {
        Environment env = this.getEnvironment();
        String opId = this.createOperatorIdentifier(op, this.getConfiguration().getVertexID());
        OperatorStateBackend operatorStateBackend = this.stateBackend.createOperatorStateBackend(env, opId);
        this.cancelables.registerCloseable((Closeable)operatorStateBackend);
        if (null != restoreStateHandles) {
            operatorStateBackend.restore(restoreStateHandles);
        }
        return operatorStateBackend;
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange) throws Exception {
        if (this.keyedStateBackend != null) {
            throw new RuntimeException("The keyed state backend can only be created once.");
        }
        String operatorIdentifier = this.createOperatorIdentifier((StreamOperator<?>)this.headOperator, this.configuration.getVertexID());
        this.keyedStateBackend = this.stateBackend.createKeyedStateBackend(this.getEnvironment(), this.getEnvironment().getJobID(), operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, this.getEnvironment().getTaskKvStateRegistry());
        this.cancelables.registerCloseable(this.keyedStateBackend);
        Collection restoreKeyedStateHandles = null;
        if (this.taskStateSnapshot != null) {
            OperatorSubtaskState stateByOperatorID = this.taskStateSnapshot.getSubtaskStateByOperatorID(this.headOperator.getOperatorID());
            restoreKeyedStateHandles = stateByOperatorID != null ? stateByOperatorID.getManagedKeyedState() : null;
        }
        this.keyedStateBackend.restore(restoreKeyedStateHandles);
        AbstractKeyedStateBackend<?> typedBackend = this.keyedStateBackend;
        return typedBackend;
    }

    public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
        return this.stateBackend.createStreamFactory(this.getEnvironment().getJobID(), this.createOperatorIdentifier(operator, this.configuration.getVertexID()));
    }

    public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
        return this.stateBackend.createSavepointStreamFactory(this.getEnvironment().getJobID(), this.createOperatorIdentifier(operator, this.configuration.getVertexID()), targetLocation);
    }

    private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
        TaskInfo taskInfo = this.getEnvironment().getTaskInfo();
        return operator.getClass().getSimpleName() + "_" + operator.getOperatorID() + "_(" + taskInfo.getIndexOfThisSubtask() + "/" + taskInfo.getNumberOfParallelSubtasks() + ")";
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (this.isRunning) {
            this.getEnvironment().failExternally(exception);
        }
    }

    public String toString() {
        return this.getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    private static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress;

        public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.checkpointOptions = (CheckpointOptions)Preconditions.checkNotNull((Object)checkpointOptions);
            this.checkpointMetrics = (CheckpointMetrics)Preconditions.checkNotNull((Object)checkpointMetrics);
            this.allOperators = owner.operatorChain.getAllOperators();
            this.operatorSnapshotsInProgress = new HashMap<OperatorID, OperatorSnapshotResult>(this.allOperators.length);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void executeCheckpointing() throws Exception {
            block14: {
                this.startSyncPartNano = System.nanoTime();
                boolean failed = true;
                try {
                    for (StreamOperator<?> op : this.allOperators) {
                        this.checkpointStreamOperator(op);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
                    }
                    this.startAsyncPartNano = System.nanoTime();
                    this.checkpointMetrics.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000L);
                    this.runAsyncCheckpointingAndAcknowledge();
                    failed = false;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} - finished synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics.getAlignmentDurationNanos() / 1000000L, this.checkpointMetrics.getSyncDurationMillis()});
                    }
                    if (!failed) break block14;
                }
                catch (Throwable throwable) {
                    if (failed) {
                        for (OperatorSnapshotResult operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
                            if (null == operatorSnapshotResult) continue;
                            try {
                                operatorSnapshotResult.cancel();
                            }
                            catch (Exception e) {
                                LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
                            }
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics.getAlignmentDurationNanos() / 1000000L, this.checkpointMetrics.getSyncDurationMillis()});
                        }
                    }
                    throw throwable;
                }
                for (OperatorSnapshotResult operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
                    if (null == operatorSnapshotResult) continue;
                    try {
                        operatorSnapshotResult.cancel();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics.getAlignmentDurationNanos() / 1000000L, this.checkpointMetrics.getSyncDurationMillis()});
                }
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
                OperatorSnapshotResult snapshotInProgress = op.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions);
                this.operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

        public void runAsyncCheckpointingAndAcknowledge() throws IOException {
            AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(this.owner, this.operatorSnapshotsInProgress, this.checkpointMetaData, this.checkpointMetrics, this.startAsyncPartNano);
            ((StreamTask)this.owner).cancelables.registerCloseable((Closeable)asyncCheckpointRunnable);
            ((StreamTask)this.owner).asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
        }

        private static enum AsynCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED;

        }
    }

    private static final class AsyncCheckpointRunnable
    implements Runnable,
    Closeable {
        private final StreamTask<?, ?> owner;
        private final Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<CheckpointingOperation.AsynCheckpointState>(CheckpointingOperation.AsynCheckpointState.RUNNING);

        AsyncCheckpointRunnable(StreamTask<?, ?> owner, Map<OperatorID, OperatorSnapshotResult> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long asyncStartNanos) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.operatorSnapshotsInProgress = (Map)Preconditions.checkNotNull(operatorSnapshotsInProgress);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.checkpointMetrics = (CheckpointMetrics)Preconditions.checkNotNull((Object)checkpointMetrics);
            this.asyncStartNanos = asyncStartNanos;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                boolean hasState = false;
                TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                for (Map.Entry<OperatorID, OperatorSnapshotResult> entry : this.operatorSnapshotsInProgress.entrySet()) {
                    OperatorID operatorID = entry.getKey();
                    OperatorSnapshotResult snapshotInProgress = entry.getValue();
                    OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState((OperatorStateHandle)FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()), (OperatorStateHandle)FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()), (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()), (KeyedStateHandle)FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture()));
                    hasState |= operatorSubtaskState.hasState();
                    taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
                }
                long asyncEndNanos = System.nanoTime();
                long asyncDurationMillis = (asyncEndNanos - this.asyncStartNanos) / 1000000L;
                this.checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
                if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
                    TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null;
                    this.owner.getEnvironment().acknowledgeCheckpoint(this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics, acknowledgedState);
                    LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), asyncDurationMillis});
                    LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), acknowledgedState});
                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
                }
            }
            catch (Exception e) {
                this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING);
                try {
                    this.cleanup();
                }
                catch (Exception cleanupException) {
                    e.addSuppressed(cleanupException);
                }
                AsynchronousException asyncException = new AsynchronousException(new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', e));
                this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
            }
            finally {
                ((StreamTask)this.owner).cancelables.unregisterCloseable((Closeable)this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }

        @Override
        public void close() {
            try {
                this.cleanup();
            }
            catch (Exception cleanupException) {
                LOG.warn("Could not properly clean up the async checkpoint runnable.", (Throwable)cleanupException);
            }
        }

        private void cleanup() throws Exception {
            if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
                LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
                Exception exception = null;
                for (OperatorSnapshotResult operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
                    if (operatorSnapshotResult == null) continue;
                    try {
                        operatorSnapshotResult.cancel();
                    }
                    catch (Exception cancelException) {
                        exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cancelException, exception);
                    }
                }
                if (null != exception) {
                    throw exception;
                }
            } else {
                LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
            }
        }
    }
}

