/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class StreamTaskFinalCheckpointsTest {
    @Test
    public void testCheckpointDoneOnFinishedOperator() throws Exception {
        FinishingOperator finishingOperator = new FinishingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)finishingOperator).build();
        harness.setAutoProcess(false);
        harness.processElement(new StreamRecord((Object)1));
        harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor());
        Assert.assertTrue((boolean)FinishingOperator.finished);
        harness.getTaskStateManager().getWaitForReportLatch().reset();
        harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(2L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
        harness.getTaskStateManager().getWaitForReportLatch().await();
        Assert.assertEquals((long)2L, (long)harness.getTaskStateManager().getReportedCheckpointId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNotWaitingForAllRecordsProcessedIfCheckpointNotEnabled() throws Exception {
        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> config.setCheckpointingEnabled(false)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(partitionWriters).setupOperatorChain((StreamOperator<?>)new EmptyOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();){
                testHarness.endInput();
                for (ResultPartitionWriter writer : partitionWriters) {
                    Assert.assertEquals((long)0L, (long)((PipelinedResultPartition)writer).getNumberOfQueuedBuffers());
                }
            }
        }
        finally {
            for (ResultPartitionWriter writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            int lastCheckpointId = 6;
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assert.assertEquals((long)2L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assert.assertEquals((long)4L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 2);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 2);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, lastCheckpointId);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.finishProcessing();
                Assert.assertTrue((boolean)checkpointFuture.isDone());
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals((long)6L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals((long)6L, (long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition : partitionWriters) {
                    Assert.assertEquals((long)4L, (long)resultPartition.getNumberOfQueuedBuffers());
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(CompletingCheckpointResponder checkpointResponder) throws Exception {
        return this.createTestHarness(null, checkpointResponder, false);
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(@Nullable ResultPartition[] partitionWriters, CompletingCheckpointResponder checkpointResponder, boolean enableUnalignedCheckpoint) throws Exception {
        StreamTaskMailboxTestHarnessBuilder testHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        if (partitionWriters != null) {
            testHarnessBuilder = testHarnessBuilder.addAdditionalOutput((ResultPartitionWriter[])partitionWriters);
        }
        StreamTaskMailboxTestHarness<String> testHarness = testHarnessBuilder.addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).modifyStreamConfig(config -> {
            config.setCheckpointingEnabled(true);
            config.setUnalignedCheckpointsEnabled(enableUnalignedCheckpoint);
            config.getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        }).setCheckpointResponder(checkpointResponder).setupOperatorChain((StreamOperator<?>)new EmptyOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();
        checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
        return testHarness;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWaitingForFinalCheckpointNotTheFirsNotifiedComplete() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                checkpointResponder.completeCheckpoints(Collections.singletonList(3L));
                testHarness.waitForTaskCompletion();
                CompletableFuture<Boolean> firstCheckpoint = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 1L);
                firstCheckpoint.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                testHarness.processAll();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 3L);
                testHarness.processAll();
                testHarness.finishProcessing();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals((long)3L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals((long)3L, (long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition : partitionWriters) {
                    Assert.assertEquals((long)4L, (long)resultPartition.getNumberOfQueuedBuffers());
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerStopWithSavepointWhenWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            final int finalCheckpointId = 6;
            final int syncSavepointId = 7;
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder(){

                @Override
                public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                    if ((long)syncSavepointId == checkpointId) {
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, finalCheckpointId, checkpointMetrics, subtaskState);
                        try {
                            Thread.sleep(500L);
                        }
                        catch (InterruptedException e) {
                            throw new FlinkRuntimeException((Throwable)e);
                        }
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, syncSavepointId, checkpointMetrics, subtaskState);
                    }
                }
            };
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                testHarness.waitForTaskCompletion();
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, finalCheckpointId);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                CompletableFuture<Boolean> savepointFuture = StreamTaskFinalCheckpointsTest.triggerStopWithSavepointDrain(testHarness, syncSavepointId);
                testHarness.finishProcessing();
                Assert.assertTrue((boolean)checkpointFuture.isDone());
                Assert.assertTrue((boolean)savepointFuture.isDone());
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals((long)syncSavepointId, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertEquals((long)syncSavepointId, (long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
                for (ResultPartition resultPartition : partitionWriters) {
                    Assert.assertEquals((long)3L, (long)resultPartition.getNumberOfQueuedBuffers());
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    @Test
    public void testTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        final int finalCheckpointId = 6;
        final int syncSavepointId = 7;
        CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder(){

            @Override
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                if ((long)syncSavepointId == checkpointId) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, finalCheckpointId, checkpointMetrics, subtaskState);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, syncSavepointId, checkpointMetrics, subtaskState);
                }
            }
        };
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> {
            config.setCheckpointingEnabled(true);
            config.getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        }).setCheckpointResponder(checkpointResponder).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new ImmediatelyFinishingSource())).build();){
            checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
            testHarness.streamTask.runMailboxLoop();
            CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, finalCheckpointId);
            CompletableFuture<Boolean> savepointFuture = StreamTaskFinalCheckpointsTest.triggerStopWithSavepointDrain(testHarness, syncSavepointId);
            testHarness.finishProcessing();
            Assert.assertTrue((boolean)checkpointFuture.isDone());
            Assert.assertTrue((boolean)savepointFuture.isDone());
            testHarness.getTaskStateManager().getWaitForReportLatch().await();
            Assert.assertEquals((long)syncSavepointId, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
            Assert.assertEquals((long)syncSavepointId, (long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
        }
    }

    @Test
    public void testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        final int finalCheckpointId = 6;
        final int syncSavepointId = 7;
        CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder(){
            private CheckpointMetrics metrics;
            private TaskStateSnapshot stateSnapshot;

            @Override
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                if (checkpointId == (long)finalCheckpointId) {
                    this.metrics = checkpointMetrics;
                    this.stateSnapshot = subtaskState;
                }
            }

            @Override
            public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointException checkpointException) {
                if ((long)syncSavepointId == checkpointId) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, finalCheckpointId, this.metrics, this.stateSnapshot);
                }
            }
        };
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> {
            config.setCheckpointingEnabled(true);
            config.getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        }).setCheckpointResponder(checkpointResponder).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new ImmediatelyFinishingSource())).build();){
            checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
            testHarness.streamTask.runMailboxLoop();
            CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, finalCheckpointId);
            CompletableFuture<Boolean> savepointFuture = StreamTaskFinalCheckpointsTest.triggerStopWithSavepointNoDrain(testHarness, syncSavepointId);
            testHarness.finishProcessing();
            Assert.assertTrue((boolean)checkpointFuture.isDone());
            Assert.assertTrue((boolean)savepointFuture.isDone());
            Assert.assertFalse((boolean)savepointFuture.get());
            testHarness.getTaskStateManager().getWaitForReportLatch().await();
            Assert.assertEquals((long)finalCheckpointId, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
            Assert.assertEquals((long)finalCheckpointId, (long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
        }
    }

    @Test
    public void testTriggerSourceFinishesWhileStoppingWithSavepointWithoutDrain() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> {
            config.setCheckpointingEnabled(true);
            config.getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        }).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new ImmediatelyFinishingSource())).build();){
            StreamTaskFinalCheckpointsTest.triggerStopWithSavepointNoDrain(testHarness, 1L);
            testHarness.streamTask.runMailboxLoop();
        }
        catch (Exception ex) {
            ExceptionUtils.assertThrowable((Throwable)ex, e -> e.getMessage().equals("We run out of data to process while waiting for a synchronous savepoint to be finished. This can lead to a deadlock waiting for a final checkpoint after a synchronous savepoint, which will never be triggered."));
        }
    }

    @Test
    public void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedNoTimeout((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringUnalignedCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.unaligned((CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedWithTimeout((CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, new CompletingCheckpointResponder(), checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());){
                int numChannels = testHarness.inputGates[0].getInputGate().getNumberOfInputChannels();
                int[] resumedCount = new int[numChannels];
                for (int i = 0; i < numChannels; ++i) {
                    TestInputChannel inputChannel = (TestInputChannel)testHarness.inputGates[0].getInputGate().getChannel(i);
                    inputChannel.setActionOnResumed(() -> {
                        int n = inputChannel.getChannelIndex();
                        resumedCount[n] = resumedCount[n] + 1;
                    });
                }
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assert.assertEquals((long)2L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals((int[])new int[]{0, 0, 0}, (int[])resumedCount);
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assert.assertEquals((long)4L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals((int[])new int[]{0, 0, 0}, (int[])resumedCount);
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 2);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 2);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 6L, checkpointOptions);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.finishProcessing();
                Assert.assertTrue((boolean)checkpointFuture.isDone());
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals((long)6L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertArrayEquals((int[])new int[]{0, 0, 0}, (int[])resumedCount);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assert.assertEquals((long)4L, (long)resultPartition.getNumberOfQueuedBuffers());
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReportOperatorsFinishedInCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 1).addAdditionalOutput((ResultPartitionWriter[])partitionWriters).setCheckpointResponder(checkpointResponder).modifyStreamConfig(config -> {
                config.setCheckpointingEnabled(true);
                config.getConfiguration().set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
            }).setupOperatorChain((StreamOperator<?>)new StatefulOperator()).finishForSingletonOperatorChain(StringSerializer.INSTANCE).build();){
                checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assert.assertEquals((long)2L, (long)testHarness.getTaskStateManager().getReportedCheckpointId());
                Assert.assertFalse((boolean)((TaskStateSnapshot)testHarness.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(2L)).isTaskFinished());
                testHarness.processEvent((AbstractEvent)EndOfData.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.finishProcessing();
                Assert.assertTrue((boolean)checkpointFuture.isDone());
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertTrue((boolean)((TaskStateSnapshot)testHarness.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(4L)).isTaskFinished());
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId, CheckpointOptions checkpointOptions) {
        testHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return testHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(checkpointId, checkpointId * 1000L), checkpointOptions);
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointDrain(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerStopWithSavepoint(testHarness, checkpointId, CheckpointType.SAVEPOINT_TERMINATE);
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointNoDrain(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerStopWithSavepoint(testHarness, checkpointId, CheckpointType.SAVEPOINT_SUSPEND);
    }

    static CompletableFuture<Boolean> triggerStopWithSavepoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId, CheckpointType checkpointType) {
        testHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return testHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(checkpointId, checkpointId * 1000L), CheckpointOptions.alignedNoTimeout((CheckpointType)checkpointType, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    static void processMailTillCheckpointSucceeds(StreamTaskMailboxTestHarness<String> testHarness, Future<Boolean> checkpointFuture) throws Exception {
        while (!checkpointFuture.isDone()) {
            testHarness.processSingleStep();
        }
        testHarness.getTaskStateManager().getWaitForReportLatch().await();
    }

    @Test
    public void testWaitingForPendingCheckpointsOnFinished() throws Exception {
        final long delayedCheckpointId = 2L;
        CompletingCheckpointResponder responder = new CompletingCheckpointResponder(){

            @Override
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                if (delayedCheckpointId == checkpointId) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                } else {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                }
            }
        };
        try (StreamTaskMailboxTestHarness<String> harness = this.createTestHarness(responder);){
            harness.waitForTaskCompletion();
            harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(delayedCheckpointId, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.processAll();
            harness.finishProcessing();
            Assert.assertEquals((long)delayedCheckpointId, (long)harness.getTaskStateManager().getReportedCheckpointId());
        }
    }

    @Test
    public void testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception {
        try (StreamTaskMailboxTestHarness harness = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).setCollectNetworkEvents().setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperator<?>)new TestFinishedOnRestoreStreamOperator()).chain(new TestFinishedOnRestoreStreamOperator(), StringSerializer.INSTANCE).finish().build();){
            harness.processAll();
            harness.getTaskStateManager().getWaitForReportLatch().reset();
            CheckpointMetaData checkpointMetaData = new CheckpointMetaData(2L, 2L);
            CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
            harness.streamTask.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.getTaskStateManager().getWaitForReportLatch().await();
            Assert.assertEquals((long)2L, (long)harness.getTaskStateManager().getReportedCheckpointId());
            harness.streamTask.notifyCheckpointCompleteAsync(2L);
            harness.streamTask.notifyCheckpointAbortAsync(3L, 2L);
            harness.processAll();
            harness.processElement(Watermark.MAX_WATERMARK, 0, 0);
            harness.processElement(Watermark.MAX_WATERMARK, 0, 1);
            harness.processElement(Watermark.MAX_WATERMARK, 0, 2);
            harness.waitForTaskCompletion();
            harness.finishProcessing();
            MatcherAssert.assertThat(harness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions), Watermark.MAX_WATERMARK, EndOfData.INSTANCE}));
        }
    }

    private static class StatefulOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private ListState<Integer> state;

        private StatefulOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("test", Integer.class));
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }
    }

    private static class FinishingOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static boolean finished = false;

        private FinishingOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }

        public void finish() throws Exception {
            finished = true;
        }
    }

    private static class EmptyOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private EmptyOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }
    }

    private static class ImmediatelyFinishingSource
    implements SourceFunction<String> {
        private ImmediatelyFinishingSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        }

        public void cancel() {
        }
    }
}

