/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.stream.IntStream;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.UnalignedCheckpointsTest;
import org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AlignedCheckpointsTest {
    protected static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static int sizeCounter = 1;
    CheckpointedInputGate inputGate;
    static long testStartTimeNanos;
    private MockInputGate mockInputGate;

    @Before
    public void setUp() {
        testStartTimeNanos = System.nanoTime();
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, AbstractInvokable toNotify) throws IOException {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).setupBufferPoolFactory(environment).build();
        gate.setInputChannels((InputChannel[])IntStream.range(0, numberOfChannels).mapToObj(channelIndex -> InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setupFromNettyShuffleEnvironment(environment).setConnectionManager((ConnectionManager)new TestingConnectionManager()).buildRemoteChannel(gate)).toArray(RemoteInputChannel[]::new));
        gate.setup();
        gate.requestPartitions();
        return this.createCheckpointedInputGate((IndexedInputGate)gate, toNotify);
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, AbstractInvokable toNotify, boolean enableCheckpointsAfterTasksFinish) {
        this.mockInputGate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return this.createCheckpointedInputGate(this.mockInputGate, toNotify, enableCheckpointsAfterTasksFinish);
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, AbstractInvokable toNotify) {
        this.mockInputGate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return this.createCheckpointedInputGate(this.mockInputGate, toNotify);
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence) {
        return this.createCheckpointedInputGate(numberOfChannels, sequence, (AbstractInvokable)new DummyCheckpointInvokable());
    }

    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate gate, AbstractInvokable toNotify) {
        return this.createCheckpointedInputGate(gate, toNotify, true);
    }

    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate gate, AbstractInvokable toNotify, boolean enableCheckpointsAfterTasksFinish) {
        return new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)SingleCheckpointBarrierHandler.aligned((String)"Testing", (CheckpointableTask)toNotify, (Clock)SystemClock.getInstance(), (int)gate.getNumberOfInputChannels(), (callable, duration) -> () -> {}, (boolean)enableCheckpointsAfterTasksFinish, (CheckpointableInput[])new CheckpointableInput[]{gate}), (MailboxExecutor)new SyncMailboxExecutor());
    }

    @After
    public void ensureEmpty() throws Exception {
        Assert.assertFalse((boolean)this.inputGate.pollNext().isPresent());
        Assert.assertTrue((boolean)this.inputGate.isFinished());
        this.inputGate.close();
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0)};
        this.inputGate = this.createCheckpointedInputGate(1, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0), AlignedCheckpointsTest.createBuffer(3), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createEndOfPartition(3), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createEndOfPartition(2)};
        this.inputGate = this.createCheckpointedInputGate(4, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(4L, 0), AlignedCheckpointsTest.createBarrier(5L, 0), AlignedCheckpointsTest.createBarrier(6L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(1, sequence, handler);
        handler.setNextExpectedCheckpointId(1L);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createBarrier(1L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(2L, 1), AlignedCheckpointsTest.createBarrier(2L, 2), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(3L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBarrier(3L, 1), AlignedCheckpointsTest.createBarrier(4L, 1), AlignedCheckpointsTest.createBarrier(4L, 2), AlignedCheckpointsTest.createBarrier(4L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createEndOfPartition(2)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(3, sequence, handler);
        handler.setNextExpectedCheckpointId(1L);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
        long startTs = System.nanoTime();
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
        startTs = System.nanoTime();
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)handler.getNextExpectedCheckpointId());
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[20], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)handler.getNextExpectedCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)5L, (long)handler.getNextExpectedCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[25], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[26], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[27], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[28], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testMultiChannelJumpingOverCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createBarrier(1L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(2L, 1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(3L, 2), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBarrier(4L, 1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createEndOfPartition(2), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createEndOfPartition(1)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(3, sequence, handler, false);
        handler.setNextExpectedCheckpointId(1L);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)this.inputGate.getLatestCheckpointId());
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 1}));
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{2}));
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 2}));
        AlignedCheckpointsTest.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1}));
        AlignedCheckpointsTest.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[20], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)handler.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)3L, (long)handler.getAbortedCheckpointCounter());
    }

    @Test
    public void testMetrics() throws Exception {
        ArrayList<BufferOrEvent> output = new ArrayList<BufferOrEvent>();
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        int numberOfChannels = 3;
        this.inputGate = this.createCheckpointedInputGate(numberOfChannels, (AbstractInvokable)handler);
        int[] sequenceNumbers = new int[numberOfChannels];
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        Thread.sleep(sleepTime);
        long alignmentStartNanos = System.nanoTime();
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, AlignedCheckpointsTest.createBuffer(0, bufferSize), AlignedCheckpointsTest.createBuffer(1, bufferSize), AlignedCheckpointsTest.createBuffer(2, bufferSize), AlignedCheckpointsTest.createBarrier(checkpointId, 1, checkpointBarrierCreation), AlignedCheckpointsTest.createBuffer(0, bufferSize), AlignedCheckpointsTest.createBuffer(2, bufferSize), AlignedCheckpointsTest.createBarrier(checkpointId, 0), AlignedCheckpointsTest.createBuffer(2, bufferSize));
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, AlignedCheckpointsTest.createBarrier(checkpointId, 2), AlignedCheckpointsTest.createBuffer(0, bufferSize), AlignedCheckpointsTest.createBuffer(1, bufferSize), AlignedCheckpointsTest.createBuffer(2, bufferSize), AlignedCheckpointsTest.createEndOfPartition(0), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createEndOfPartition(2));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        long alignmentDuration = System.nanoTime() - alignmentStartNanos;
        Assert.assertEquals((long)checkpointId, (long)this.inputGate.getCheckpointBarrierHandler().getLatestCheckpointId());
        Assert.assertThat((Object)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime)));
        Assert.assertThat((Object)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(startDelay)));
        Assert.assertTrue((boolean)handler.lastAlignmentDurationNanos.isDone());
        Assert.assertThat((Object)(handler.lastAlignmentDurationNanos.get() / 1000000L), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime)));
        Assert.assertThat((Object)handler.lastAlignmentDurationNanos.get(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(alignmentDuration)));
        Assert.assertTrue((boolean)handler.lastBytesProcessedDuringAlignment.isDone());
        Assert.assertThat((Object)handler.lastBytesProcessedDuringAlignment.get(), (Matcher)Matchers.equalTo((Object)(3L * (long)bufferSize)));
    }

    @Test
    public void testMissingCancellationBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createCancellationBarrier(3L, 1), AlignedCheckpointsTest.createCancellationBarrier(2L, 0), AlignedCheckpointsTest.createCancellationBarrier(3L, 0), AlignedCheckpointsTest.createBuffer(0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(2, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
    }

    @Test
    public void testStartAlignmentWithClosedChannels() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createEndOfPartition(2), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(3), AlignedCheckpointsTest.createBarrier(2L, 3), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBarrier(3L, 3), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(3), AlignedCheckpointsTest.createEndOfPartition(0), AlignedCheckpointsTest.createBuffer(3), AlignedCheckpointsTest.createBarrier(4L, 3), AlignedCheckpointsTest.createBuffer(3), AlignedCheckpointsTest.createEndOfPartition(3)};
        this.inputGate = this.createCheckpointedInputGate(4, sequence);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createBarrier(1L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(2L, 2), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createEndOfPartition(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createEndOfPartition(0)};
        this.inputGate = this.createCheckpointedInputGate(3, sequence);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)this.inputGate.getLatestCheckpointId());
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createCancellationBarrier(4L, 0), AlignedCheckpointsTest.createBarrier(5L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createCancellationBarrier(6L, 0), AlignedCheckpointsTest.createBuffer(0)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(1, sequence, toNotify);
        toNotify.setNextExpectedCheckpointId(1L);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        toNotify.setNextExpectedCheckpointId(2L);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        toNotify.setNextExpectedCheckpointId(5L);
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        Assert.assertEquals((long)4L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertEquals((long)4L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)5L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertEquals((long)4L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)6L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertEquals((long)6L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Assert.assertEquals((long)3L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)2L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createBarrier(1L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(2L, 2), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createCancellationBarrier(2L, 1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBarrier(3L, 1), AlignedCheckpointsTest.createBarrier(3L, 2), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createCancellationBarrier(4L, 1), AlignedCheckpointsTest.createBarrier(4L, 2), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(4L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(5L, 2), AlignedCheckpointsTest.createBarrier(5L, 1), AlignedCheckpointsTest.createBarrier(5L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createCancellationBarrier(6L, 1), AlignedCheckpointsTest.createCancellationBarrier(6L, 2), AlignedCheckpointsTest.createBarrier(6L, 0), AlignedCheckpointsTest.createBuffer(0)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(3, sequence, toNotify);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        long startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(1L);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1}));
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1, 2}));
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0}));
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 2}));
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        Assert.assertEquals((long)2L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(3L);
        AlignedCheckpointsTest.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[20], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        Assert.assertEquals((long)4L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        AlignedCheckpointsTest.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[25], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[26], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(5L);
        AlignedCheckpointsTest.check(sequence[27], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[28], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[29], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[30], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[31], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[32], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        AlignedCheckpointsTest.check(sequence[33], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        Assert.assertEquals((long)6L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        AlignedCheckpointsTest.check(sequence[34], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[35], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)3L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testAbortOnCanceledBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createCancellationBarrier(1L, 0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBarrier(2L, 1), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBarrier(1L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(2L, 2), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(2)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(3, sequence, toNotify);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        long startTs = System.nanoTime();
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{1}));
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        toNotify.setNextExpectedCheckpointId(2L);
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)1L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(3L, 1), AlignedCheckpointsTest.createBarrier(3L, 0), AlignedCheckpointsTest.createBuffer(2), AlignedCheckpointsTest.createBarrier(5L, 2), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createCancellationBarrier(3L, 0), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBarrier(5L, 0), AlignedCheckpointsTest.createBarrier(5L, 1), AlignedCheckpointsTest.createBuffer(0), AlignedCheckpointsTest.createBuffer(1), AlignedCheckpointsTest.createBuffer(2)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createCheckpointedInputGate(3, sequence, toNotify);
        AlignedCheckpointsTest.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        long startTs = System.nanoTime();
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{0, 1}));
        AlignedCheckpointsTest.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.containsInAnyOrder((Object[])new Integer[]{2}));
        AlignedCheckpointsTest.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 0);
        AlignedCheckpointsTest.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        toNotify.setNextExpectedCheckpointId(5L);
        AlignedCheckpointsTest.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.validateAlignmentTime(startTs, this.inputGate);
        Assert.assertThat(this.mockInputGate.getBlockedChannels(), (Matcher)Matchers.empty());
        AlignedCheckpointsTest.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        AlignedCheckpointsTest.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)1L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testTriggerCheckpointsWithEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBarrier(1L, 0), AlignedCheckpointsTest.createBarrier(1L, 1), AlignedCheckpointsTest.createEndOfPartition(2)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = this.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent bufferOrEvent : sequence) {
            AlignedCheckpointsTest.check(bufferOrEvent, (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        }
        Assert.assertThat(validator.triggeredCheckpoints, (Matcher)Matchers.contains((Object[])new Long[]{1L}));
        Assert.assertEquals((long)0L, (long)validator.getAbortedCheckpointCounter());
        Assert.assertThat((Object)this.inputGate.getCheckpointBarrierHandler().isCheckpointPending(), (Matcher)Matchers.equalTo((Object)false));
    }

    @Test
    public void testDeduplicateChannelsWithBothBarrierAndEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createBarrier(2L, 0), AlignedCheckpointsTest.createBarrier(2L, 1), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createBarrier(2L, 2)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = this.createCheckpointedInputGate(3, sequence, validator);
        for (int i = 0; i <= 2; ++i) {
            AlignedCheckpointsTest.check(sequence[i], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        }
        Assert.assertEquals((long)0L, (long)validator.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)0L, (long)validator.getAbortedCheckpointCounter());
        Assert.assertEquals((Object)sequence[3], this.inputGate.pollNext().get());
        Assert.assertThat(validator.triggeredCheckpoints, (Matcher)Matchers.contains((Object[])new Long[]{2L}));
        Assert.assertThat((Object)this.inputGate.getCheckpointBarrierHandler().isCheckpointPending(), (Matcher)Matchers.equalTo((Object)false));
    }

    @Test
    public void testTriggerCheckpointsAfterReceivedEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{AlignedCheckpointsTest.createEndOfPartition(2), AlignedCheckpointsTest.createBarrier(6L, 0), AlignedCheckpointsTest.createBarrier(6L, 1), AlignedCheckpointsTest.createEndOfPartition(1), AlignedCheckpointsTest.createBarrier(7L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = this.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent bufferOrEvent : sequence) {
            AlignedCheckpointsTest.check(bufferOrEvent, (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        }
        Assert.assertThat(validator.triggeredCheckpoints, (Matcher)Matchers.contains((Object[])new Long[]{6L, 7L}));
        Assert.assertEquals((long)0L, (long)validator.getAbortedCheckpointCounter());
        Assert.assertThat((Object)this.inputGate.getCheckpointBarrierHandler().isCheckpointPending(), (Matcher)Matchers.equalTo((Object)false));
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel) {
        return AlignedCheckpointsTest.createBarrier(checkpointId, channel, System.currentTimeMillis());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel, long creationTimestamp) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(checkpointId, creationTimestamp, CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(checkpointId), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel) {
        int size = sizeCounter++;
        return AlignedCheckpointsTest.createBuffer(channel, size);
    }

    static BufferOrEvent createBuffer(int channel, int size) {
        return new BufferOrEvent(TestBufferFactory.createBuffer((int)size), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, channel));
    }

    private static void check(BufferOrEvent expected, BufferOrEvent present, int pageSize) {
        Assert.assertNotNull((Object)expected);
        Assert.assertNotNull((Object)present);
        Assert.assertEquals((Object)expected.isBuffer(), (Object)present.isBuffer());
        if (expected.isBuffer()) {
            Assert.assertEquals((long)expected.getBuffer().getMaxCapacity(), (long)present.getBuffer().getMaxCapacity());
            Assert.assertEquals((long)expected.getBuffer().getSize(), (long)present.getBuffer().getSize());
            MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
            MemorySegment presentMem = present.getBuffer().getMemorySegment();
            Assert.assertTrue((String)"memory contents differs", (expectedMem.compare(presentMem, 0, 0, pageSize) == 0 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((Object)expected.getEvent(), (Object)present.getEvent());
        }
    }

    private static void validateAlignmentTime(long alignmentStartTimestamp, CheckpointedInputGate inputGate) {
        long elapsedAlignment = System.nanoTime() - alignmentStartTimestamp;
        long elapsedTotalTime = System.nanoTime() - testStartTimeNanos;
        Assert.assertThat((Object)inputGate.getAlignmentDurationNanos(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(elapsedAlignment)));
        long tolerance = 1000000L;
        Assert.assertThat((Object)inputGate.getCheckpointStartDelayNanos(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(elapsedTotalTime + tolerance)));
    }

    public static class CheckpointExceptionMatcher
    extends BaseMatcher<CheckpointException> {
        private final CheckpointFailureReason failureReason;

        public CheckpointExceptionMatcher(CheckpointFailureReason failureReason) {
            this.failureReason = failureReason;
        }

        public boolean matches(Object o) {
            return o != null && o.getClass() == CheckpointException.class && ((CheckpointException)((Object)o)).getCheckpointFailureReason().equals((Object)this.failureReason);
        }

        public void describeTo(Description description) {
            description.appendText("CheckpointException - reason = " + this.failureReason);
        }
    }
}

