/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointSequenceValidator;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.UnalignedCheckpointsTest;
import org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

class CheckpointBarrierTrackerTest {
    private static final int PAGE_SIZE = 512;
    private CheckpointedInputGate inputGate;

    CheckpointBarrierTrackerTest() {
    }

    @AfterEach
    void ensureEmpty() throws Exception {
        Assertions.assertThat((Optional)this.inputGate.pollNext()).isNotPresent();
        Assertions.assertThat((boolean)this.inputGate.isFinished()).isTrue();
    }

    @Test
    void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0)};
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(3), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2)};
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(4, sequence);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 3L, 4L, 5L, 6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testSingleChannelWithSkippedBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(7L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(10L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 3L, 4L, 6L, 7L, 10L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 3L, 4L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testMultiChannelSkippingCheckpoints() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 4L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testCompleteCheckpointsOnLateBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(5L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(6L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(7L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(7L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(8L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(8L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(9L, 1), CheckpointBarrierTrackerTest.createBarrier(7L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(9L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(10L, 2), CheckpointBarrierTrackerTest.createBarrier(8L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(9L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(10L, 0), CheckpointBarrierTrackerTest.createBarrier(10L, 1)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(2L, 3L, 4L, 5L, 7L, 8L, 9L, 10L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        ManualClock manualClock = new ManualClock();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, validator, (Clock)manualClock);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
            manualClock.advanceTime(Duration.ofSeconds(1L));
        }
        FlinkAssertions.assertThatFuture(validator.lastAlignmentDurationNanos).eventuallySucceeds().isEqualTo((Object)Duration.ofSeconds(2L).toNanos());
    }

    @Test
    void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, -4L, 5L, -6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createCancellationBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(5L, 2), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 2), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, -2L, 3L, -4L, 5L, -6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testInterleavedCancellationBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 2), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(-1L, -2L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
    }

    @Test
    void testMetrics() throws Exception {
        ArrayList<BufferOrEvent> output = new ArrayList<BufferOrEvent>();
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        int numberOfChannels = 3;
        this.inputGate = this.createCheckpointedInputGate(numberOfChannels, handler);
        int[] sequenceNumbers = new int[numberOfChannels];
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        long alignmentStartNanos = System.nanoTime();
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(1, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 1, checkpointBarrierCreation), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 0), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize));
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBarrier(checkpointId, 2), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(1, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createEndOfPartition(0), CheckpointBarrierTrackerTest.createEndOfPartition(1), CheckpointBarrierTrackerTest.createEndOfPartition(2));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        long alignmentDuration = System.nanoTime() - alignmentStartNanos;
        Assertions.assertThat((long)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(startDelay));
        FlinkAssertions.assertThatFuture(handler.getLastAlignmentDurationNanos()).eventuallySucceeds().satisfies(new ThrowingConsumer[]{duration -> Assertions.assertThat((long)(duration / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(alignmentDuration))});
        Assertions.assertThat(handler.getLastAlignmentDurationNanos()).isDone();
        Assertions.assertThat((long)(handler.getLastAlignmentDurationNanos().get() / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(alignmentDuration));
        Assertions.assertThat(handler.getLastBytesProcessedDuringAlignment()).isCompletedWithValue((Object)(3L * (long)bufferSize));
    }

    @Test
    void testSingleChannelMetrics() throws Exception {
        ArrayList<BufferOrEvent> output = new ArrayList<BufferOrEvent>();
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        int numberOfChannels = 1;
        this.inputGate = this.createCheckpointedInputGate(numberOfChannels, handler);
        int[] sequenceNumbers = new int[numberOfChannels];
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 0, checkpointBarrierCreation), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createEndOfPartition(0));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        Assertions.assertThat((long)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L)).isBetween(Long.valueOf(sleepTime), Long.valueOf(startDelay));
        Assertions.assertThat(handler.getLastAlignmentDurationNanos()).isCompletedWithValue((Object)0L);
        Assertions.assertThat(handler.getLastBytesProcessedDuringAlignment()).isCompletedWithValue((Object)0L);
    }

    @Test
    void testTriggerCheckpointsWithEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createEndOfPartition(2)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(4L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        CheckpointBarrierTracker checkpointBarrierTracker = (CheckpointBarrierTracker)this.inputGate.getCheckpointBarrierHandler();
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat(validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{4L});
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
        Assertions.assertThat((List)checkpointBarrierTracker.getPendingCheckpointIds()).containsExactly((Object[])new Long[]{5L});
        Assertions.assertThat((int)checkpointBarrierTracker.getNumOpenChannels()).isEqualTo(2);
    }

    @Test
    void testDeduplicateChannelsWithBothBarrierAndEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createEndOfPartition(1), CheckpointBarrierTrackerTest.createBarrier(2L, 2)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (int i = 0; i <= 2; ++i) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)sequence[i]);
        }
        Assertions.assertThat((long)validator.getTriggeredCheckpointCounter()).isZero();
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
        Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)sequence[3]);
        Assertions.assertThat(validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{2L});
    }

    @Test
    void testTriggerCheckpointsAfterReceivedEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createEndOfPartition(2), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 1), CheckpointBarrierTrackerTest.createEndOfPartition(1), CheckpointBarrierTrackerTest.createBarrier(7L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat(validator.triggeredCheckpoints).containsExactly((Object[])new Long[]{6L, 7L});
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isZero();
    }

    @Test
    void testNoFastPathWithChannelFinishedDuringCheckpoints() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createEndOfPartition(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat((long)validator.getTriggeredCheckpointCounter()).isOne();
        Assertions.assertThat((boolean)this.inputGate.getCheckpointBarrierHandler().isCheckpointPending()).isFalse();
    }

    @Test
    void testCompleteAndRemoveAbortedCheckpointWithEndOfPartition() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createCancellationBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createEndOfPartition(2)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        CheckpointBarrierTracker checkpointBarrierTracker = (CheckpointBarrierTracker)this.inputGate.getCheckpointBarrierHandler();
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat((long)validator.getAbortedCheckpointCounter()).isOne();
        Assertions.assertThat((long)validator.getLastCanceledCheckpointId()).isEqualTo(4L);
        Assertions.assertThat((long)validator.getTriggeredCheckpointCounter()).isZero();
        Assertions.assertThat((List)checkpointBarrierTracker.getPendingCheckpointIds()).containsExactly((Object[])new Long[]{5L});
        Assertions.assertThat((int)checkpointBarrierTracker.getNumOpenChannels()).isEqualTo(2L);
    }

    @Test
    void testAbortCheckpointsAfterEndOfPartitionReceived() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createEndOfPartition(2), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 1), CheckpointBarrierTrackerTest.createEndOfPartition(1), CheckpointBarrierTrackerTest.createCancellationBarrier(7L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler(-1L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat(validator.abortedCheckpoints).containsExactly((Object[])new Long[]{5L, 6L, 7L});
        Assertions.assertThat((long)validator.getTriggeredCheckpointCounter()).isZero();
    }

    @Test
    void testNoFastPathWithChannelFinishedDuringCheckpointsCancel() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 0, 0L), CheckpointBarrierTrackerTest.createEndOfPartition(0), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 1)};
        ValidatingCheckpointHandler checkpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, checkpointHandler);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
        }
        Assertions.assertThat((long)checkpointHandler.getLastCanceledCheckpointId()).isOne();
        Assertions.assertThat((boolean)this.inputGate.getCheckpointBarrierHandler().isCheckpointPending()).isFalse();
    }

    @Test
    void testTwoLastBarriersOneByOne() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        ManualClock manualClock = new ManualClock();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, validator, (Clock)manualClock);
        for (BufferOrEvent boe : sequence) {
            Assertions.assertThat((Optional)this.inputGate.pollNext()).hasValue((Object)boe);
            manualClock.advanceTime(Duration.ofSeconds(1L));
        }
        FlinkAssertions.assertThatFuture(validator.lastAlignmentDurationNanos).eventuallySucceeds().isEqualTo((Object)Duration.ofSeconds(2L).toNanos());
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, AbstractInvokable toNotify) throws IOException {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).setupBufferPoolFactory(environment).build();
        gate.setInputChannels((InputChannel[])IntStream.range(0, numberOfChannels).mapToObj(channelIndex -> InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setupFromNettyShuffleEnvironment(environment).setConnectionManager((ConnectionManager)new TestingConnectionManager()).buildRemoteChannel(gate)).toArray(RemoteInputChannel[]::new));
        gate.setup();
        gate.requestPartitions();
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate((IndexedInputGate)gate, toNotify);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence) {
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(numberOfChannels, sequence, (AbstractInvokable)new DummyCheckpointInvokable());
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
        MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(gate, toNotifyOnCheckpoint);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, @Nullable AbstractInvokable toNotifyOnCheckpoint, Clock clock) {
        MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(gate, toNotifyOnCheckpoint, clock);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(inputGate, toNotifyOnCheckpoint, (Clock)SystemClock.getInstance());
    }

    private static CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint, Clock clock) {
        return new CheckpointedInputGate((InputGate)inputGate, (CheckpointBarrierHandler)new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), (CheckpointableTask)toNotifyOnCheckpoint, clock, true), (MailboxExecutor)new SyncMailboxExecutor());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel) {
        return CheckpointBarrierTrackerTest.createBarrier(checkpointId, channel, System.currentTimeMillis());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel, long creationTimestamp) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(checkpointId, creationTimestamp, CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createCancellationBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(id), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel) {
        return new BufferOrEvent((Buffer)new NetworkBuffer(MemorySegmentFactory.wrap((byte[])new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel, int size) {
        return new BufferOrEvent(TestBufferFactory.createBuffer((int)size), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, channel));
    }
}

