/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class UnalignedCheckpointsTest {
    private static final long DEFAULT_CHECKPOINT_ID = 0L;
    private int sizeCounter = 1;
    private CheckpointedInputGate inputGate;
    private RecordingChannelStateWriter channelStateWriter;
    private int[] sequenceNumbers;
    private List<BufferOrEvent> output;

    UnalignedCheckpointsTest() {
    }

    @BeforeEach
    void setUp() {
        this.channelStateWriter = new RecordingChannelStateWriter();
    }

    @AfterEach
    void ensureEmpty() throws Exception {
        if (this.inputGate != null) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).isNotPresent();
            Assertions.assertThat((boolean)this.inputGate.isFinished()).isTrue();
            this.inputGate.close();
        }
        if (this.channelStateWriter != null) {
            this.channelStateWriter.close();
        }
    }

    @Test
    void testSingleChannelNoBarriers() throws Exception {
        this.inputGate = this.createInputGate(1, new ValidatingCheckpointHandler(1L));
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(0), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testMultiChannelNoBarriers() throws Exception {
        this.inputGate = this.createInputGate(4, new ValidatingCheckpointHandler(1L));
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBuffer(2), this.createBuffer(2), this.createBuffer(0), this.createBuffer(1), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0), this.createBuffer(3), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(3), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBuffer(2), UnalignedCheckpointsTest.createEndOfPartition(2));
        this.assertOutput(sequence);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testSingleChannelWithBarriers() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(1, handler);
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(0), this.createBuffer(0), this.createBarrier(1L, 0), this.createBuffer(0), this.createBuffer(0), this.createBuffer(0), this.createBuffer(0), this.createBarrier(2L, 0), this.createBarrier(3L, 0), this.createBuffer(0), this.createBuffer(0), this.createBarrier(4L, 0), this.createBarrier(5L, 0), this.createBarrier(6L, 0), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence);
    }

    @Test
    void testMultiChannelWithBarriers() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(2), this.createBuffer(0), this.createBarrier(1L, 1), this.createBarrier(1L, 2), this.createBuffer(2), this.createBuffer(1), this.createBuffer(0), this.createBarrier(1L, 0));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isOne();
        this.assertInflightData(sequence1[7]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(0), this.createBuffer(1), this.createBuffer(1), this.createBuffer(2), this.createBarrier(2L, 0), this.createBarrier(2L, 1), this.createBarrier(2L, 2));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence3 = this.addSequence(this.inputGate, this.createBuffer(2), this.createBuffer(2), this.createBarrier(3L, 2), this.createBuffer(2), this.createBuffer(2), this.createBarrier(3L, 0), this.createBarrier(3L, 1));
        this.assertOutput(sequence3);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(3L);
        this.assertInflightData(new BufferOrEvent[0]);
        this.addSequence(this.inputGate, this.createBarrier(4L, 1), this.createBarrier(4L, 2), this.createBarrier(4L, 0));
        this.assertOutput(new BufferOrEvent[0]);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(4L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence5 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(2), this.createBuffer(0), this.createBarrier(5L, 1), this.createBuffer(2), this.createBuffer(0), this.createBuffer(2), this.createBuffer(1), this.createBarrier(5L, 2), this.createBuffer(1), this.createBuffer(0), this.createBuffer(2), this.createBuffer(1), this.createBarrier(5L, 0));
        this.assertOutput(sequence5);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(5L);
        this.assertInflightData(sequence5[4], sequence5[5], sequence5[6], sequence5[10]);
        BufferOrEvent[] sequence6 = this.addSequence(this.inputGate, this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1), UnalignedCheckpointsTest.createEndOfPartition(2));
        this.assertOutput(sequence6);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testMetrics() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler);
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        Thread.sleep(sleepTime);
        long alignmentStartNanos = System.nanoTime();
        this.addSequence(this.inputGate, this.createBuffer(0, bufferSize), this.createBuffer(1, bufferSize), this.createBuffer(2, bufferSize), this.createBarrier(checkpointId, 1, checkpointBarrierCreation), this.createBuffer(0, bufferSize), this.createBuffer(1, bufferSize), this.createBuffer(2, bufferSize), this.createBarrier(checkpointId, 0), this.createBuffer(0, bufferSize), this.createBuffer(1, bufferSize), this.createBuffer(2, bufferSize));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        Thread.sleep(sleepTime);
        this.addSequence(this.inputGate, this.createBarrier(checkpointId, 2), this.createBuffer(0, bufferSize), this.createBuffer(1, bufferSize), this.createBuffer(2, bufferSize), UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1), UnalignedCheckpointsTest.createEndOfPartition(2));
        long alignmentDuration = System.nanoTime() - alignmentStartNanos;
        Assertions.assertThat((long)this.inputGate.getCheckpointBarrierHandler().getLatestCheckpointId()).isEqualTo(checkpointId);
        Assertions.assertThat((long)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(startDelay));
        Assertions.assertThat(handler.getLastAlignmentDurationNanos()).isDone();
        Assertions.assertThat((long)(handler.getLastAlignmentDurationNanos().get() / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(alignmentDuration));
        Assertions.assertThat(handler.getLastBytesProcessedDuringAlignment()).isCompletedWithValue((Object)(6L * (long)bufferSize));
    }

    @Test
    void testMultiChannelTrailingInflightData() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler, false);
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(1), this.createBuffer(2), this.createBarrier(1L, 1), this.createBarrier(1L, 2), this.createBarrier(1L, 0), this.createBuffer(2), this.createBuffer(1), this.createBuffer(0), this.createBarrier(2L, 1), this.createBuffer(1), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBuffer(0), this.createBuffer(2), this.createBarrier(2L, 2), this.createBuffer(2), UnalignedCheckpointsTest.createEndOfPartition(2), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testMissingCancellationBarriers() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(2, handler);
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBarrier(1L, 0), this.createCancellationBarrier(2L, 0), this.createCancellationBarrier(3L, 0), this.createCancellationBarrier(3L, 1), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1));
        this.assertOutput(sequence);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isOne();
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(3L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testEarlyCleanup() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler, false);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(1), this.createBuffer(2), this.createBarrier(1L, 1), this.createBarrier(1L, 2), this.createBarrier(1L, 0));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isOne();
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBuffer(2), this.createBuffer(1), this.createBuffer(0), this.createBarrier(2L, 1), this.createBuffer(1), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBuffer(0), this.createBuffer(2), this.createBarrier(2L, 2), this.createBuffer(2), UnalignedCheckpointsTest.createEndOfPartition(2), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testStartAlignmentWithClosedChannels() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(2L);
        this.inputGate = this.createInputGate(4, handler);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, UnalignedCheckpointsTest.createEndOfPartition(2), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBuffer(0), this.createBuffer(0), this.createBuffer(3), this.createBarrier(2L, 3), this.createBarrier(2L, 0));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBuffer(3), this.createBuffer(0), this.createBarrier(3L, 3), this.createBuffer(3), this.createBuffer(0), this.createBarrier(3L, 0));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(3L);
        this.assertInflightData(sequence2[4]);
        BufferOrEvent[] sequence3 = this.addSequence(this.inputGate, this.createBarrier(4L, 0), this.createBarrier(4L, 3));
        this.assertOutput(sequence3);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(4L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence4 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(0), this.createBuffer(3), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence4);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(-1L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence5 = this.addSequence(this.inputGate, this.createBuffer(3), this.createBarrier(5L, 3), this.createBuffer(3), UnalignedCheckpointsTest.createEndOfPartition(3));
        this.assertOutput(sequence5);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(5L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testEndOfStreamWhileCheckpoint() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, this.createBarrier(1L, 0), this.createBarrier(1L, 1), this.createBarrier(1L, 2));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isOne();
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(0), this.createBuffer(2), this.createBarrier(2L, 2), this.createBarrier(2L, 0), this.createBuffer(0), this.createBuffer(2), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(2), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        this.assertInflightData(sequence2[7]);
    }

    @Test
    public void testNotifyAbortCheckpointBeforeCancellingAsyncCheckpoint() throws Exception {
        ValidateAsyncFutureNotCompleted handler = new ValidateAsyncFutureNotCompleted(1L);
        this.inputGate = this.createInputGate(2, handler);
        handler.setInputGate(this.inputGate);
        this.addSequence(this.inputGate, this.createBarrier(1L, 0), this.createCancellationBarrier(1L, 1));
        this.addSequence(this.inputGate, UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1));
    }

    @Test
    void testSingleChannelAbortCheckpoint() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(1, handler);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBarrier(1L, 0), this.createBuffer(0), this.createBarrier(2L, 0), this.createCancellationBarrier(4L, 0));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(4L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBarrier(5L, 0), this.createBuffer(0), this.createCancellationBarrier(6L, 0), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(5L);
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(6L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testMultiChannelAbortCheckpoint() throws Exception {
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(1L);
        this.inputGate = this.createInputGate(3, handler);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(2), this.createBuffer(0), this.createBarrier(1L, 1), this.createBarrier(1L, 2), this.createBuffer(2), this.createBuffer(1), this.createBarrier(1L, 0), this.createBuffer(0), this.createBuffer(2));
        this.assertOutput(sequence1);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isOne();
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBarrier(2L, 0), this.createBarrier(2L, 2), this.createBuffer(0), this.createBuffer(2), this.createCancellationBarrier(2L, 1));
        this.assertOutput(sequence2);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(2L);
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(2L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence3 = this.addSequence(this.inputGate, this.createBuffer(2), this.createBuffer(1), this.createBarrier(3L, 1), this.createBarrier(3L, 2), this.createBarrier(3L, 0));
        this.assertOutput(sequence3);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(3L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence4 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(1), this.createCancellationBarrier(4L, 1), this.createBarrier(4L, 2), this.createBuffer(0), this.createBarrier(4L, 0));
        this.assertOutput(sequence4);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(-1L);
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(4L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence5 = this.addSequence(this.inputGate, this.createBuffer(0), this.createBuffer(1), this.createBuffer(2), this.createBarrier(5L, 2), this.createBarrier(5L, 1), this.createBarrier(5L, 0), this.createBuffer(0), this.createBuffer(1));
        this.assertOutput(sequence5);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(5L);
        this.assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] sequence6 = this.addSequence(this.inputGate, this.createCancellationBarrier(6L, 1), this.createCancellationBarrier(6L, 2), this.createBarrier(6L, 0), this.createBuffer(0), UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1), UnalignedCheckpointsTest.createEndOfPartition(2));
        this.assertOutput(sequence6);
        Assertions.assertThat((long)this.channelStateWriter.getLastStartedCheckpointId()).isEqualTo(-1L);
        Assertions.assertThat((long)handler.getLastCanceledCheckpointId()).isEqualTo(6L);
        this.assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    void testProcessCancellationBarrierAfterProcessBarrier() throws Exception {
        ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable();
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).setChannelFactory(InputChannelBuilder::buildLocalChannel).build();
        SingleCheckpointBarrierHandler handler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, (String)"test", (CheckpointableTask)invokable, (Clock)SystemClock.getInstance(), (boolean)true, (CheckpointableInput[])new CheckpointableInput[]{inputGate});
        handler.processBarrier(this.buildCheckpointBarrier(0L), new InputChannelInfo(0, 0), false);
        Assertions.assertThat((boolean)handler.isCheckpointPending()).isTrue();
        Assertions.assertThat((long)handler.getLatestCheckpointId()).isEqualTo(0L);
        this.testProcessCancellationBarrier(handler, invokable);
    }

    @Test
    void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception {
        ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable();
        SingleInputGate inputGate = new SingleInputGateBuilder().setChannelFactory(InputChannelBuilder::buildLocalChannel).build();
        SingleCheckpointBarrierHandler handler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, (String)"test", (CheckpointableTask)invokable, (Clock)SystemClock.getInstance(), (boolean)true, (CheckpointableInput[])new CheckpointableInput[]{inputGate});
        handler.processCancellationBarrier(new CancelCheckpointMarker(0L), new InputChannelInfo(0, 0));
        this.verifyTriggeredCheckpoint(handler, invokable, 0L);
        handler.processBarrier(this.buildCheckpointBarrier(0L), new InputChannelInfo(0, 0), false);
        this.verifyTriggeredCheckpoint(handler, invokable, 0L);
    }

    private void testProcessCancellationBarrier(SingleCheckpointBarrierHandler handler, ValidatingCheckpointInvokable invokable) throws Exception {
        long cancelledCheckpointId = new Random().nextBoolean() ? 0L : 1L;
        handler.processCancellationBarrier(new CancelCheckpointMarker(cancelledCheckpointId), new InputChannelInfo(0, 0));
        this.verifyTriggeredCheckpoint(handler, invokable, cancelledCheckpointId);
        long nextCancelledCheckpointId = cancelledCheckpointId + 1L;
        handler.processCancellationBarrier(new CancelCheckpointMarker(nextCancelledCheckpointId), new InputChannelInfo(0, 0));
        this.verifyTriggeredCheckpoint(handler, invokable, nextCancelledCheckpointId);
    }

    private void verifyTriggeredCheckpoint(SingleCheckpointBarrierHandler handler, ValidatingCheckpointInvokable invokable, long currentCheckpointId) {
        Assertions.assertThat((boolean)handler.isCheckpointPending()).isFalse();
        Assertions.assertThat((long)handler.getLatestCheckpointId()).isEqualTo(currentCheckpointId);
        Assertions.assertThat((long)invokable.getAbortedCheckpointId()).isEqualTo(currentCheckpointId);
    }

    @Test
    void testEndOfStreamWithPendingCheckpoint() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable();
        SingleInputGate inputGate = new SingleInputGateBuilder().setChannelFactory(InputChannelBuilder::buildLocalChannel).setNumberOfChannels(2).build();
        SingleCheckpointBarrierHandler handler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, (String)"test", (CheckpointableTask)invokable, (Clock)SystemClock.getInstance(), (boolean)false, (CheckpointableInput[])new CheckpointableInput[]{inputGate});
        handler.processBarrier(this.buildCheckpointBarrier(0L), new InputChannelInfo(0, 0), false);
        Assertions.assertThat((boolean)handler.isCheckpointPending()).isTrue();
        Assertions.assertThat((long)handler.getLatestCheckpointId()).isEqualTo(0L);
        Assertions.assertThat((int)handler.getNumOpenChannels()).isEqualTo(2);
        handler.processEndOfPartition(new InputChannelInfo(0, 0));
        Assertions.assertThat((boolean)handler.isCheckpointPending()).isFalse();
        Assertions.assertThat((long)handler.getLatestCheckpointId()).isEqualTo(0L);
        Assertions.assertThat((int)handler.getNumOpenChannels()).isEqualTo(1);
        Assertions.assertThat((long)invokable.getAbortedCheckpointId()).isEqualTo(0L);
    }

    @Test
    void testTriggerCheckpointsWithEndOfPartition() throws Exception {
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = this.createInputGate(3, validator);
        BufferOrEvent[] sequence = this.addSequence(this.inputGate, this.createBarrier(1L, 1), this.createBuffer(0), this.createBarrier(1L, 0), this.createBuffer(1), UnalignedCheckpointsTest.createEndOfPartition(2), UnalignedCheckpointsTest.createEndOfPartition(0), UnalignedCheckpointsTest.createEndOfPartition(1));
        this.assertOutput(sequence);
        Assertions.assertThat((List)validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{1L});
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
        this.assertInflightData(sequence[1]);
    }

    @Test
    void testTriggerCheckpointsAfterReceivedEndOfPartition() throws Exception {
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = this.createInputGate(3, validator);
        BufferOrEvent[] sequence1 = this.addSequence(this.inputGate, UnalignedCheckpointsTest.createEndOfPartition(0), this.createBarrier(3L, 1), this.createBuffer(1), this.createBuffer(2), UnalignedCheckpointsTest.createEndOfPartition(1), this.createBarrier(3L, 2));
        this.assertOutput(sequence1);
        this.assertInflightData(sequence1[3]);
        Assertions.assertThat((List)validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{3L});
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
        BufferOrEvent[] sequence2 = this.addSequence(this.inputGate, this.createBuffer(2), this.createBarrier(4L, 2), UnalignedCheckpointsTest.createEndOfPartition(2));
        this.assertOutput(sequence2);
        this.assertInflightData(new BufferOrEvent[0]);
        Assertions.assertThat((List)validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{3L, 4L});
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
    }

    private BufferOrEvent createBarrier(long checkpointId, int channel) {
        return this.createBarrier(checkpointId, channel, System.currentTimeMillis());
    }

    private BufferOrEvent createBarrier(long checkpointId, int channel, long timestamp) {
        ++this.sizeCounter;
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(checkpointId, timestamp, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, channel));
    }

    private BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
        ++this.sizeCounter;
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(checkpointId), new InputChannelInfo(0, channel));
    }

    private BufferOrEvent createBuffer(int channel, int size) {
        return new BufferOrEvent(TestBufferFactory.createBuffer(size), new InputChannelInfo(0, channel));
    }

    private BufferOrEvent createBuffer(int channel) {
        int size = this.sizeCounter++;
        return new BufferOrEvent(TestBufferFactory.createBuffer(size), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, channel));
    }

    private CheckpointedInputGate createInputGate(int numberOfChannels, AbstractInvokable toNotify) throws IOException {
        return this.createInputGate(numberOfChannels, toNotify, true);
    }

    private CheckpointedInputGate createInputGate(int numberOfChannels, AbstractInvokable toNotify, boolean enableCheckpointsAfterTasksFinished) throws IOException {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).setupBufferPoolFactory(environment).build();
        gate.setInputChannels((InputChannel[])IntStream.range(0, numberOfChannels).mapToObj(channelIndex -> InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setStateWriter(this.channelStateWriter).setupFromNettyShuffleEnvironment(environment).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(gate)).toArray(RemoteInputChannel[]::new));
        this.sequenceNumbers = new int[numberOfChannels];
        gate.setup();
        gate.requestPartitions();
        return this.createCheckpointedInputGate((IndexedInputGate)gate, toNotify, enableCheckpointsAfterTasksFinished);
    }

    private BufferOrEvent[] addSequence(CheckpointedInputGate inputGate, BufferOrEvent ... sequence) throws Exception {
        this.output = new ArrayList<BufferOrEvent>();
        UnalignedCheckpointsTest.addSequence(inputGate, this.output, this.sequenceNumbers, sequence);
        this.sizeCounter = 1;
        return sequence;
    }

    static BufferOrEvent[] addSequence(CheckpointedInputGate inputGate, List<BufferOrEvent> output, int[] sequenceNumbers, BufferOrEvent ... sequence) throws Exception {
        for (BufferOrEvent bufferOrEvent : sequence) {
            if (bufferOrEvent.isEvent()) {
                bufferOrEvent = new BufferOrEvent(EventSerializer.toBuffer((AbstractEvent)bufferOrEvent.getEvent(), (boolean)(bufferOrEvent.getEvent() instanceof CheckpointBarrier)), bufferOrEvent.getChannelInfo(), bufferOrEvent.moreAvailable(), bufferOrEvent.morePriorityEvents());
            }
            int n = bufferOrEvent.getChannelInfo().getInputChannelIdx();
            int n2 = sequenceNumbers[n];
            sequenceNumbers[n] = n2 + 1;
            ((RemoteInputChannel)inputGate.getChannel(bufferOrEvent.getChannelInfo().getInputChannelIdx())).onBuffer(bufferOrEvent.getBuffer(), n2, 0, 0);
            while (inputGate.pollNext().map(output::add).isPresent()) {
            }
        }
        return sequence;
    }

    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate gate, AbstractInvokable toNotify) {
        return this.createCheckpointedInputGate(gate, toNotify, true);
    }

    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate gate, AbstractInvokable toNotify, boolean enableCheckpointsAfterTasksFinished) {
        SingleCheckpointBarrierHandler barrierHandler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)new TestSubtaskCheckpointCoordinator(this.channelStateWriter), (String)"Test", (CheckpointableTask)toNotify, (Clock)SystemClock.getInstance(), (boolean)enableCheckpointsAfterTasksFinished, (CheckpointableInput[])new CheckpointableInput[]{gate});
        return new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)barrierHandler, (MailboxExecutor)new SyncMailboxExecutor());
    }

    private void assertInflightData(BufferOrEvent ... expected) {
        Collection<BufferOrEvent> andResetInflightData = this.getAndResetInflightData();
        ((ListAssert)Assertions.assertThat(this.getIds(andResetInflightData)).as("Unexpected in-flight sequence: " + String.valueOf(andResetInflightData), new Object[0])).isEqualTo(this.getIds(Arrays.asList(expected)));
    }

    private Collection<BufferOrEvent> getAndResetInflightData() {
        List<BufferOrEvent> inflightData = this.channelStateWriter.getAddedInput().entries().stream().map(entry -> new BufferOrEvent((Buffer)entry.getValue(), (InputChannelInfo)entry.getKey())).collect(Collectors.toList());
        this.channelStateWriter.reset();
        return inflightData;
    }

    private void assertOutput(BufferOrEvent ... expectedSequence) {
        ((ListAssert)Assertions.assertThat(this.getIds(this.output)).as("Unexpected output sequence", new Object[0])).isEqualTo(this.getIds(Arrays.asList(expectedSequence)));
    }

    private List<Object> getIds(Collection<BufferOrEvent> buffers) {
        return buffers.stream().filter(boe -> !boe.isEvent() || !(boe.getEvent() instanceof CheckpointBarrier) && !(boe.getEvent() instanceof CancelCheckpointMarker)).map(boe -> boe.isBuffer() ? Integer.valueOf(boe.getSize() - 1) : boe.getEvent()).collect(Collectors.toList());
    }

    private CheckpointBarrier buildCheckpointBarrier(long id) {
        return new CheckpointBarrier(id, 0L, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    private static final class ValidatingCheckpointInvokable
    extends StreamTask {
        private long expectedCheckpointId;
        private int totalNumCheckpoints;
        private long abortedCheckpointId;

        ValidatingCheckpointInvokable() throws Exception {
            super((Environment)new DummyEnvironment("test", 1, 0));
        }

        public void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
        }

        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) throws IOException {
            this.abortedCheckpointId = checkpointId;
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) {
            this.expectedCheckpointId = checkpointMetaData.getCheckpointId();
            ++this.totalNumCheckpoints;
        }

        long getTriggeredCheckpointId() {
            return this.expectedCheckpointId;
        }

        int getTotalTriggeredCheckpoints() {
            return this.totalNumCheckpoints;
        }

        long getAbortedCheckpointId() {
            return this.abortedCheckpointId;
        }
    }

    static class ValidateAsyncFutureNotCompleted
    extends ValidatingCheckpointHandler {
        @Nullable
        private CheckpointedInputGate inputGate;

        public ValidateAsyncFutureNotCompleted(long nextExpectedCheckpointId) {
            super(nextExpectedCheckpointId);
        }

        @Override
        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
            super.abortCheckpointOnBarrier(checkpointId, cause);
            Preconditions.checkState((this.inputGate != null ? 1 : 0) != 0);
            Assertions.assertThat((CompletableFuture)this.inputGate.getAllBarriersReceivedFuture(checkpointId)).isNotDone();
        }

        public void setInputGate(CheckpointedInputGate inputGate) {
            this.inputGate = inputGate;
        }
    }

    static class ValidatingCheckpointHandler
    extends org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler {
        public ValidatingCheckpointHandler(long nextExpectedCheckpointId) {
            super(nextExpectedCheckpointId);
        }

        @Override
        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
            super.abortCheckpointOnBarrier(checkpointId, cause);
            this.nextExpectedCheckpointId = -1L;
        }
    }
}

