/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class StreamTaskTerminationTest
extends TestLogger {
    private static final OneShotLatch RUN_LATCH = new OneShotLatch();
    private static final AtomicBoolean SNAPSHOT_HAS_STARTED = new AtomicBoolean();
    private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch();
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)10L);

    @Test
    public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws Exception {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        NoOpStreamOperator noOpStreamOperator = new NoOpStreamOperator();
        BlockingStateBackend blockingStateBackend = new BlockingStateBackend();
        streamConfig.setStreamOperator(noOpStreamOperator);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setStateBackend((StateBackend)blockingStateBackend);
        long checkpointId = 0L;
        long checkpointTimestamp = 0L;
        JobInformation jobInformation = new JobInformation(new JobID(), "Test Job", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, BlockingStreamTask.class.getName(), taskConfiguration);
        TestingTaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        Task task = new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), MemoryManagerBuilder.newBuilder().setMemorySize(32768L).build(), (IOManager)new IOManagerAsync(), (ShuffleEnvironment)shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), (TaskOperatorEventGateway)new NoOpTaskOperatorEventGateway(), (GlobalAggregateManager)new TestGlobalAggregateManager(), (LibraryCacheManager.ClassLoaderHandle)TestingClassLoaderLease.newBuilder().build(), (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)taskManagerRuntimeInfo, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor());
        CompletableFuture<Void> taskRun = CompletableFuture.runAsync(() -> task.run(), TestingUtils.defaultExecutor());
        RUN_LATCH.await();
        task.triggerCheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        taskRun.get();
        if (task.getFailureCause() != null) {
            throw new Exception("Task failed", task.getFailureCause());
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
    }

    static class BlockingCallable
    implements Callable<SnapshotResult<OperatorStateHandle>> {
        BlockingCallable() {
        }

        @Override
        public SnapshotResult<OperatorStateHandle> call() throws Exception {
            SNAPSHOT_HAS_STARTED.set(true);
            CLEANUP_LATCH.await();
            throw new FlinkException("Checkpointing operation failed");
        }
    }

    static class BlockingStateBackend
    implements StateBackend,
    CheckpointStorage {
        private static final long serialVersionUID = -5053068148933314100L;

        BlockingStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) {
            throw new UnsupportedOperationException();
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return new MemoryBackendCheckpointStorageAccess(jobId, null, null, Integer.MAX_VALUE);
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            OperatorStateBackend operatorStateBackend = (OperatorStateBackend)Mockito.mock(OperatorStateBackend.class);
            Mockito.when((Object)operatorStateBackend.snapshot(Matchers.anyLong(), Matchers.anyLong(), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class), (CheckpointOptions)Matchers.any(CheckpointOptions.class))).thenReturn(new FutureTask<SnapshotResult<OperatorStateHandle>>(new BlockingCallable()));
            return operatorStateBackend;
        }
    }

    private static class NoOpStreamOperator<T>
    extends AbstractStreamOperator<T> {
        private static final long serialVersionUID = 4517845269225218312L;

        private NoOpStreamOperator() {
        }
    }

    public static class BlockingStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        private boolean isRunning;

        public BlockingStreamTask(Environment env) throws Exception {
            super(env);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.isRunning) {
                this.isRunning = true;
                RUN_LATCH.trigger();
            }
            if (this.isCanceled() || SNAPSHOT_HAS_STARTED.get()) {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            }
        }

        protected void cleanUpInternal() throws Exception {
            CLEANUP_LATCH.trigger();
            Assert.assertTrue((boolean)this.getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
        }
    }
}

