/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointPlan;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlan;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

class PendingCheckpointTest {
    private static final List<Execution> ACK_TASKS = new ArrayList<Execution>();
    private static final List<ExecutionVertex> TASKS_TO_COMMIT = new ArrayList<ExecutionVertex>();
    private static final ExecutionAttemptID ATTEMPT_ID = ExecutionGraphTestUtils.createExecutionAttemptId();
    public static final OperatorID OPERATOR_ID = new OperatorID();
    public static final int PARALLELISM = 1;
    public static final int MAX_PARALLELISM = 128;
    @TempDir
    private java.nio.file.Path tmpFolder;

    PendingCheckpointTest() {
    }

    @Test
    void testCanBeSubsumed() throws Exception {
        CheckpointProperties forced = new CheckpointProperties(true, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), false, false, false, false, false, false);
        PendingCheckpoint pending = this.createPendingCheckpoint(forced);
        Assertions.assertThat((boolean)pending.canBeSubsumed()).isFalse();
        Assertions.assertThatThrownBy(() -> this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED), (String)"Did not throw expected Exception", (Object[])new Object[0]).isInstanceOf(IllegalStateException.class);
        CheckpointProperties subsumed = new CheckpointProperties(false, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), false, false, false, false, false, false);
        Assertions.assertThat((boolean)this.createPendingCheckpoint(subsumed).canBeSubsumed()).isFalse();
    }

    @Test
    void testSyncSavepointCannotBeSubsumed() throws Exception {
        CheckpointProperties forced = CheckpointProperties.forSyncSavepoint((boolean)true, (boolean)false, (SavepointFormatType)SavepointFormatType.CANONICAL);
        PendingCheckpoint pending = this.createPendingCheckpoint(forced);
        Assertions.assertThat((boolean)pending.canBeSubsumed()).isFalse();
        Assertions.assertThatThrownBy(() -> this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED), (String)"Did not throw expected Exception", (Object[])new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCompletionFuture() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), false, false, false, false, false, false);
        PendingCheckpoint pending = this.createPendingCheckpoint(props);
        CompletableFuture future = pending.getCompletionFuture();
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assertions.assertThat((boolean)future.isDone()).isTrue();
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assertions.assertThat((boolean)future.isDone()).isTrue();
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assertions.assertThat((boolean)future.isDone()).isTrue();
        pending = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isTrue();
        pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        PendingCheckpoint pendingCheckpoint = this.createPendingCheckpoint(props);
        future = pending.getCompletionFuture();
        Assertions.assertThat((boolean)future.isDone()).isFalse();
        Assertions.assertThatThrownBy(() -> pendingCheckpoint.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor()), (String)"Did not throw expected Exception", (Object[])new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testAbortDiscardsState() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, (SnapshotType)CheckpointType.CHECKPOINT, false, false, false, false, false, false);
        QueueExecutor executor = new QueueExecutor();
        OperatorState state = new OperatorState(new OperatorID(), 1, 256);
        CheckpointCoordinatorTest.OperatorSubtaskStateMock subtaskStateMock = new CheckpointCoordinatorTest.OperatorSubtaskStateMock();
        OperatorSubtaskState subtaskState = subtaskStateMock.getSubtaskState();
        state.putState(0, subtaskState);
        PendingCheckpoint pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        executor.runQueuedCommands();
        subtaskStateMock.verifyDiscard();
        subtaskStateMock.reset();
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        executor.runQueuedCommands();
        subtaskStateMock.verifyDiscard();
        subtaskStateMock.reset();
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
        executor.runQueuedCommands();
        subtaskStateMock.verifyDiscard();
        subtaskStateMock.reset();
        pending = this.createPendingCheckpoint(props, executor);
        PendingCheckpointTest.setTaskState(pending, state);
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
        executor.runQueuedCommands();
        subtaskStateMock.verifyDiscard();
    }

    @Test
    void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        pending.acknowledgeTask(ATTEMPT_ID, null, (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class));
        OperatorState expectedState = new OperatorState(OPERATOR_ID, 1, 128);
        Assertions.assertThat(Collections.singletonMap(OPERATOR_ID, expectedState)).isEqualTo((Object)pending.getOperatorStates());
    }

    @Test
    void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        pending.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class), (CheckpointMetrics)Mockito.mock(CheckpointMetrics.class));
        Assertions.assertThat((Map)pending.getOperatorStates()).isNotEmpty();
    }

    @Test
    void testSetCanceller() throws Exception {
        CheckpointProperties props = new CheckpointProperties(false, (SnapshotType)CheckpointType.CHECKPOINT, true, true, true, true, true, false);
        PendingCheckpoint aborted = this.createPendingCheckpoint(props);
        this.abort(aborted, CheckpointFailureReason.CHECKPOINT_DECLINED);
        Assertions.assertThat((boolean)aborted.isDisposed()).isTrue();
        Assertions.assertThat((boolean)aborted.setCancellerHandle((ScheduledFuture)Mockito.mock(ScheduledFuture.class))).isFalse();
        PendingCheckpoint pending = this.createPendingCheckpoint(props);
        ScheduledFuture canceller = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        Assertions.assertThat((boolean)pending.setCancellerHandle(canceller)).isTrue();
        this.abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
        ((ScheduledFuture)Mockito.verify((Object)canceller)).cancel(false);
    }

    @Test
    void testMasterState() throws Exception {
        TestingMasterTriggerRestoreHook masterHook = new TestingMasterTriggerRestoreHook("master hook");
        masterHook.addStateContent("state");
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonList(masterHook.getIdentifier()));
        MasterState masterState = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)masterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
        Assertions.assertThat((boolean)pending.areMasterStatesFullyAcknowledged()).isTrue();
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isFalse();
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isTrue();
        List resultMasterStates = pending.getMasterStates();
        Assertions.assertThat((List)resultMasterStates).hasSize(1);
        String deserializedState = (String)masterHook.createCheckpointDataSerializer().deserialize(77, ((MasterState)resultMasterStates.get(0)).bytes());
        Assertions.assertThat((String)"state").isEqualTo(deserializedState);
    }

    @Test
    void testMasterStateWithNullState() throws Exception {
        TestingMasterTriggerRestoreHook masterHook = new TestingMasterTriggerRestoreHook("master hook");
        masterHook.addStateContent("state");
        TestingMasterTriggerRestoreHook nullableMasterHook = new TestingMasterTriggerRestoreHook("nullable master hook");
        ArrayList<String> masterIdentifiers = new ArrayList<String>(2);
        masterIdentifiers.add(masterHook.getIdentifier());
        masterIdentifiers.add(nullableMasterHook.getIdentifier());
        PendingCheckpoint pending = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), masterIdentifiers);
        MasterState masterStateNormal = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)masterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(masterHook.getIdentifier(), masterStateNormal);
        Assertions.assertThat((boolean)pending.areMasterStatesFullyAcknowledged()).isFalse();
        MasterState masterStateNull = (MasterState)MasterHooks.triggerHook((MasterTriggerRestoreHook)nullableMasterHook, (long)0L, (long)System.currentTimeMillis(), (Executor)Executors.directExecutor()).get();
        pending.acknowledgeMasterState(nullableMasterHook.getIdentifier(), masterStateNull);
        Assertions.assertThat((boolean)pending.areMasterStatesFullyAcknowledged()).isTrue();
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isFalse();
        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isTrue();
        List resultMasterStates = pending.getMasterStates();
        Assertions.assertThat((List)resultMasterStates).hasSize(1);
        String deserializedState = (String)masterHook.createCheckpointDataSerializer().deserialize(77, ((MasterState)resultMasterStates.get(0)).bytes());
        Assertions.assertThat((String)"state").isEqualTo(deserializedState);
    }

    @Test
    void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(new TestingOperatorInfo(), new TestingOperatorInfo());
        Assertions.assertThat((int)checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators()).isEqualTo(2);
        Assertions.assertThat((boolean)checkpoint.isFullyAcknowledged()).isFalse();
    }

    @Test
    void testAcknowledgedCoordinatorStates() throws Exception {
        TestingOperatorInfo coord1 = new TestingOperatorInfo();
        TestingOperatorInfo coord2 = new TestingOperatorInfo();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coord1, coord2);
        PendingCheckpoint.TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coord1, (ByteStreamStateHandle)new TestingStreamStateHandle());
        PendingCheckpoint.TaskAcknowledgeResult ack2 = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coord2, null);
        Assertions.assertThat((Comparable)PendingCheckpoint.TaskAcknowledgeResult.SUCCESS).isEqualTo((Object)ack1);
        Assertions.assertThat((Comparable)PendingCheckpoint.TaskAcknowledgeResult.SUCCESS).isEqualTo((Object)ack2);
        Assertions.assertThat((int)0).isEqualTo(checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
        Assertions.assertThat((boolean)checkpoint.isFullyAcknowledged()).isTrue();
        Assertions.assertThat(checkpoint.getOperatorStates().keySet()).contains((Object[])new OperatorID[]{OPERATOR_ID, coord1.operatorId(), coord2.operatorId()});
    }

    @Test
    void testDuplicateAcknowledgeCoordinator() throws Exception {
        TestingOperatorInfo coordinator = new TestingOperatorInfo();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coordinator);
        checkpoint.acknowledgeCoordinatorState((OperatorInfo)coordinator, (ByteStreamStateHandle)new TestingStreamStateHandle());
        PendingCheckpoint.TaskAcknowledgeResult secondAck = checkpoint.acknowledgeCoordinatorState((OperatorInfo)coordinator, null);
        Assertions.assertThat((Comparable)PendingCheckpoint.TaskAcknowledgeResult.DUPLICATE).isEqualTo((Object)secondAck);
    }

    @Test
    void testAcknowledgeUnknownCoordinator() throws Exception {
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
        PendingCheckpoint.TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState((OperatorInfo)new TestingOperatorInfo(), null);
        Assertions.assertThat((Comparable)PendingCheckpoint.TaskAcknowledgeResult.UNKNOWN).isEqualTo((Object)ack);
    }

    @Test
    void testDisposeDisposesCoordinatorStates() throws Exception {
        TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
        TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithAcknowledgedCoordinators(handle1, handle2);
        this.abort(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
        Assertions.assertThat((boolean)handle1.isDisposed()).isTrue();
        Assertions.assertThat((boolean)handle2.isDisposed()).isTrue();
    }

    @Test
    void testReportTaskFinishedOnRestore() throws IOException {
        RecordCheckpointPlan recordCheckpointPlan = new RecordCheckpointPlan(new ArrayList<Execution>(ACK_TASKS));
        PendingCheckpoint checkpoint = this.createPendingCheckpoint((CheckpointPlan)recordCheckpointPlan);
        checkpoint.acknowledgeTask(ACK_TASKS.get(0).getAttemptId(), TaskStateSnapshot.FINISHED_ON_RESTORE, new CheckpointMetrics());
        Assertions.assertThat(recordCheckpointPlan.getReportedFinishedOnRestoreTasks()).contains((Object[])new ExecutionVertex[]{ACK_TASKS.get(0).getVertex()});
    }

    @Test
    void testReportTaskFinishedOperators() throws IOException {
        RecordCheckpointPlan recordCheckpointPlan = new RecordCheckpointPlan(new ArrayList<Execution>(ACK_TASKS));
        PendingCheckpoint checkpoint = this.createPendingCheckpoint((CheckpointPlan)recordCheckpointPlan);
        checkpoint.acknowledgeTask(ACK_TASKS.get(0).getAttemptId(), new TaskStateSnapshot(10, true), new CheckpointMetrics());
        Assertions.assertThat(recordCheckpointPlan.getReportedOperatorsFinishedTasks()).contains((Object[])new ExecutionVertex[]{ACK_TASKS.get(0).getVertex()});
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), executor);
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers) throws IOException {
        return this.createPendingCheckpoint(props, Collections.emptyList(), masterStateIdentifiers, Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointPlan checkpointPlan) throws IOException {
        return this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.emptyList(), Collections.emptyList(), checkpointPlan, Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpointWithCoordinators(OperatorInfo ... coordinators) throws IOException {
        PendingCheckpoint checkpoint = this.createPendingCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), OperatorInfo.getIds(Arrays.asList(coordinators)), Collections.emptyList(), Executors.directExecutor());
        checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
        return checkpoint;
    }

    private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle ... handles) throws IOException {
        OperatorInfo[] coords = new OperatorInfo[handles.length];
        for (int i = 0; i < handles.length; ++i) {
            coords[i] = new TestingOperatorInfo();
        }
        PendingCheckpoint checkpoint = this.createPendingCheckpointWithCoordinators(coords);
        for (int i = 0; i < handles.length; ++i) {
            checkpoint.acknowledgeCoordinatorState(coords[i], handles[i]);
        }
        return checkpoint;
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<OperatorID> operatorCoordinators, Collection<String> masterStateIdentifiers, Executor executor) throws IOException {
        ArrayList<Execution> ackTasks = new ArrayList<Execution>(ACK_TASKS);
        ArrayList<ExecutionVertex> tasksToCommit = new ArrayList<ExecutionVertex>(TASKS_TO_COMMIT);
        return this.createPendingCheckpoint(props, operatorCoordinators, masterStateIdentifiers, (CheckpointPlan)new DefaultCheckpointPlan(Collections.emptyList(), ackTasks, tasksToCommit, Collections.emptyList(), Collections.emptyList(), true), executor);
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<OperatorID> operatorCoordinators, Collection<String> masterStateIdentifiers, CheckpointPlan checkpointPlan, Executor executor) throws IOException {
        Path checkpointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder).toURI());
        FsCheckpointStorageLocation location = new FsCheckpointStorageLocation((FileSystem)LocalFileSystem.getSharedInstance(), checkpointDir, checkpointDir, checkpointDir, CheckpointStorageLocationReference.getDefault(), 1024, 4096);
        PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(new JobID(), 0L, 1L, checkpointPlan, operatorCoordinators, masterStateIdentifiers, props, new CompletableFuture(), null, new CompletableFuture());
        pendingCheckpoint.setCheckpointTargetLocation((CheckpointStorageLocation)location);
        return pendingCheckpoint;
    }

    static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
        Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
        field.setAccessible(true);
        Map taskStates = (Map)field.get(pending);
        taskStates.put(new OperatorID(), state);
    }

    private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
        checkpoint.abort(reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
    }

    static {
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly((OperatorID)OPERATOR_ID)));
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getMaxParallelism()).thenReturn((Object)128);
        Mockito.when((Object)vertex.getTotalNumberOfParallelSubtasks()).thenReturn((Object)1);
        Mockito.when((Object)vertex.getJobVertex()).thenReturn((Object)jobVertex);
        Execution execution = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)execution.getAttemptId()).thenReturn((Object)ATTEMPT_ID);
        Mockito.when((Object)execution.getVertex()).thenReturn((Object)vertex);
        ACK_TASKS.add(execution);
        TASKS_TO_COMMIT.add(vertex);
    }

    private static class RecordCheckpointPlan
    extends DefaultCheckpointPlan {
        private final List<ExecutionVertex> reportedFinishedOnRestoreTasks = new ArrayList<ExecutionVertex>();
        private final List<ExecutionVertex> reportedOperatorsFinishedTasks = new ArrayList<ExecutionVertex>();

        public RecordCheckpointPlan(List<Execution> tasksToWaitFor) {
            super(Collections.emptyList(), tasksToWaitFor, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), true);
        }

        public void reportTaskFinishedOnRestore(ExecutionVertex task) {
            super.reportTaskFinishedOnRestore(task);
            this.reportedFinishedOnRestoreTasks.add(task);
        }

        public void reportTaskHasFinishedOperators(ExecutionVertex task) {
            super.reportTaskHasFinishedOperators(task);
            this.reportedOperatorsFinishedTasks.add(task);
        }

        public List<ExecutionVertex> getReportedFinishedOnRestoreTasks() {
            return this.reportedFinishedOnRestoreTasks;
        }

        public List<ExecutionVertex> getReportedOperatorsFinishedTasks() {
            return this.reportedOperatorsFinishedTasks;
        }
    }

    private static final class TestingMasterTriggerRestoreHook
    implements MasterTriggerRestoreHook<String> {
        private final String identifier;
        private final ArrayDeque<String> stateContents;

        public TestingMasterTriggerRestoreHook(String identifier) {
            this.identifier = (String)Preconditions.checkNotNull((Object)identifier);
            this.stateContents = new ArrayDeque();
        }

        public void addStateContent(String stateContent) {
            this.stateContents.add(stateContent);
        }

        public String getIdentifier() {
            return this.identifier;
        }

        @Nullable
        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
            return CompletableFuture.completedFuture(this.stateContents.poll());
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) {
        }

        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            return new CheckpointCoordinatorTestingUtils.StringSerializer();
        }
    }

    private static final class QueueExecutor
    implements Executor {
        private final Queue<Runnable> queue = new ArrayDeque<Runnable>(4);

        private QueueExecutor() {
        }

        @Override
        public void execute(Runnable command) {
            this.queue.add(command);
        }

        public void runQueuedCommands() {
            for (Runnable runnable : this.queue) {
                runnable.run();
            }
        }
    }
}

