package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.SystemClock;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.class */
public class UnalignedCheckpointsTest {
    private static final long DEFAULT_CHECKPOINT_ID = 0;
    private int sizeCounter = 1;
    private CheckpointedInputGate inputGate;
    private RecordingChannelStateWriter channelStateWriter;
    private int[] sequenceNumbers;
    private List<BufferOrEvent> output;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest$ValidateAsyncFutureNotCompleted.class */
    static class ValidateAsyncFutureNotCompleted extends ValidatingCheckpointHandler {

        @Nullable
        private CheckpointedInputGate inputGate;

        public ValidateAsyncFutureNotCompleted(long j) {
            super(j);
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.UnalignedCheckpointsTest.ValidatingCheckpointHandler, org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler
        public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException) {
            super.abortCheckpointOnBarrier(j, checkpointException);
            Preconditions.checkState(this.inputGate != null);
            Assert.assertFalse(this.inputGate.getAllBarriersReceivedFuture(j).isDone());
        }

        public void setInputGate(CheckpointedInputGate checkpointedInputGate) {
            this.inputGate = checkpointedInputGate;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest$ValidatingCheckpointHandler.class */
    static class ValidatingCheckpointHandler extends org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler {
        public ValidatingCheckpointHandler(long j) {
            super(j);
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler
        public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException) {
            super.abortCheckpointOnBarrier(j, checkpointException);
            this.nextExpectedCheckpointId = -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest$ValidatingCheckpointInvokable.class */
    public static final class ValidatingCheckpointInvokable extends StreamTask {
        private long expectedCheckpointId;
        private int totalNumCheckpoints;
        private long abortedCheckpointId;

        ValidatingCheckpointInvokable() throws Exception {
            super(new DummyEnvironment("test", 1, 0));
        }

        public void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
        }

        public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException) throws IOException {
            this.abortedCheckpointId = j;
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder) {
            this.expectedCheckpointId = checkpointMetaData.getCheckpointId();
            this.totalNumCheckpoints++;
        }

        long getTriggeredCheckpointId() {
            return this.expectedCheckpointId;
        }

        int getTotalTriggeredCheckpoints() {
            return this.totalNumCheckpoints;
        }

        long getAbortedCheckpointId() {
            return this.abortedCheckpointId;
        }
    }

    @Before
    public void setUp() {
        this.channelStateWriter = new RecordingChannelStateWriter();
    }

    @After
    public void ensureEmpty() throws Exception {
        if (this.inputGate != null) {
            Assert.assertFalse(this.inputGate.pollNext().isPresent());
            Assert.assertTrue(this.inputGate.isFinished());
            this.inputGate.close();
        }
        if (this.channelStateWriter != null) {
            this.channelStateWriter.close();
        }
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        this.inputGate = createInputGate(1, new ValidatingCheckpointHandler(1L));
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(0), createBuffer(0), createEndOfPartition(0)));
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        this.inputGate = createInputGate(4, new ValidatingCheckpointHandler(1L));
        assertOutput(addSequence(this.inputGate, createBuffer(2), createBuffer(2), createBuffer(0), createBuffer(1), createBuffer(0), createEndOfPartition(0), createBuffer(3), createBuffer(1), createEndOfPartition(3), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)));
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        this.inputGate = createInputGate(1, new ValidatingCheckpointHandler(1L));
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBuffer(0), createBarrier(4L, 0), createBarrier(5L, 0), createBarrier(6L, 0), createBuffer(0), createEndOfPartition(0)));
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        this.inputGate = createInputGate(3, new ValidatingCheckpointHandler(1L));
        BufferOrEvent[] addSequence = addSequence(this.inputGate, createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0));
        assertOutput(addSequence);
        Assert.assertEquals(1L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(addSequence[7]);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 0), createBarrier(3L, 1)));
        Assert.assertEquals(3L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        addSequence(this.inputGate, createBarrier(4L, 1), createBarrier(4L, 2), createBarrier(4L, 0));
        assertOutput(new BufferOrEvent[0]);
        Assert.assertEquals(4L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] addSequence2 = addSequence(this.inputGate, createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(5L, 1), createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5L, 2), createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5L, 0));
        assertOutput(addSequence2);
        Assert.assertEquals(5L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(addSequence2[4], addSequence2[5], addSequence2[6], addSequence2[10]);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)));
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testMetrics() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(1L);
        this.inputGate = createInputGate(3, validatingCheckpointHandler);
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(10L);
        long nanoTime = System.nanoTime();
        addSequence(this.inputGate, createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(1L, 1, currentTimeMillis), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(1L, 0), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100));
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Thread.sleep(10L);
        addSequence(this.inputGate, createBarrier(1L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2));
        long nanoTime2 = System.nanoTime() - nanoTime;
        Assert.assertEquals(1L, this.inputGate.getCheckpointBarrierHandler().getLatestCheckpointId());
        Assert.assertThat(Long.valueOf(this.inputGate.getCheckpointStartDelayNanos() / 1000000), Matchers.greaterThanOrEqualTo(10L));
        Assert.assertThat(Long.valueOf(this.inputGate.getCheckpointStartDelayNanos() / 1000000), Matchers.lessThanOrEqualTo(Long.valueOf(currentTimeMillis2)));
        Assert.assertTrue(validatingCheckpointHandler.getLastAlignmentDurationNanos().isDone());
        Assert.assertThat(Long.valueOf(validatingCheckpointHandler.getLastAlignmentDurationNanos().get().longValue() / 1000000), Matchers.greaterThanOrEqualTo(10L));
        Assert.assertThat(validatingCheckpointHandler.getLastAlignmentDurationNanos().get(), Matchers.lessThanOrEqualTo(Long.valueOf(nanoTime2)));
        Assert.assertTrue(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().isDone());
        Assert.assertThat(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment().get(), Matchers.equalTo(Long.valueOf(6 * 100)));
    }

    @Test
    public void testMultiChannelTrailingInflightData() throws Exception {
        this.inputGate = createInputGate(3, new ValidatingCheckpointHandler(1L), false);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1L, 1), createBarrier(1L, 2), createBarrier(1L, 0), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testMissingCancellationBarriers() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(1L);
        this.inputGate = createInputGate(2, validatingCheckpointHandler);
        assertOutput(addSequence(this.inputGate, createBarrier(1L, 0), createCancellationBarrier(2L, 0), createCancellationBarrier(3L, 0), createCancellationBarrier(3L, 1), createBuffer(0), createEndOfPartition(0), createEndOfPartition(1)));
        Assert.assertEquals(1L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(3L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testEarlyCleanup() throws Exception {
        this.inputGate = createInputGate(3, new ValidatingCheckpointHandler(1L), false);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1L, 1), createBarrier(1L, 2), createBarrier(1L, 0)));
        Assert.assertEquals(1L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testStartAlignmentWithClosedChannels() throws Exception {
        this.inputGate = createInputGate(4, new ValidatingCheckpointHandler(2L));
        assertOutput(addSequence(this.inputGate, createEndOfPartition(2), createEndOfPartition(1), createBuffer(0), createBuffer(0), createBuffer(3), createBarrier(2L, 3), createBarrier(2L, 0)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] addSequence = addSequence(this.inputGate, createBuffer(3), createBuffer(0), createBarrier(3L, 3), createBuffer(3), createBuffer(0), createBarrier(3L, 0));
        assertOutput(addSequence);
        Assert.assertEquals(3L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(addSequence[4]);
        assertOutput(addSequence(this.inputGate, createBarrier(4L, 0), createBarrier(4L, 3)));
        Assert.assertEquals(4L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(0), createBuffer(3), createEndOfPartition(0)));
        Assert.assertEquals(-1L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(3), createBarrier(5L, 3), createBuffer(3), createEndOfPartition(3)));
        Assert.assertEquals(5L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() throws Exception {
        this.inputGate = createInputGate(3, new ValidatingCheckpointHandler(1L));
        assertOutput(addSequence(this.inputGate, createBarrier(1L, 0), createBarrier(1L, 1), createBarrier(1L, 2)));
        Assert.assertEquals(1L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        BufferOrEvent[] addSequence = addSequence(this.inputGate, createBuffer(0), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBarrier(2L, 0), createBuffer(0), createBuffer(2), createBuffer(1), createEndOfPartition(2), createEndOfPartition(1), createBuffer(0), createEndOfPartition(0));
        assertOutput(addSequence);
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(addSequence[7]);
    }

    @Test
    public void testNotifyAbortCheckpointBeforeCanellingAsyncCheckpoint() throws Exception {
        ValidateAsyncFutureNotCompleted validateAsyncFutureNotCompleted = new ValidateAsyncFutureNotCompleted(1L);
        this.inputGate = createInputGate(2, validateAsyncFutureNotCompleted);
        validateAsyncFutureNotCompleted.setInputGate(this.inputGate);
        addSequence(this.inputGate, createBarrier(1L, 0), createCancellationBarrier(1L, 1));
        addSequence(this.inputGate, createEndOfPartition(0), createEndOfPartition(1));
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(1L);
        this.inputGate = createInputGate(1, validatingCheckpointHandler);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBarrier(2L, 0), createCancellationBarrier(4L, 0)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(4L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBarrier(5L, 0), createBuffer(0), createCancellationBarrier(6L, 0), createBuffer(0), createEndOfPartition(0)));
        Assert.assertEquals(5L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(6L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(1L);
        this.inputGate = createInputGate(3, validatingCheckpointHandler);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(2)));
        Assert.assertEquals(1L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(0), createBuffer(2), createCancellationBarrier(2L, 1)));
        Assert.assertEquals(2L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(2L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(2), createBuffer(1), createBarrier(3L, 1), createBarrier(3L, 2), createBarrier(3L, 0)));
        Assert.assertEquals(3L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(1), createCancellationBarrier(4L, 1), createBarrier(4L, 2), createBuffer(0), createBarrier(4L, 0)));
        Assert.assertEquals(-1L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(4L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(5L, 2), createBarrier(5L, 1), createBarrier(5L, 0), createBuffer(0), createBuffer(1)));
        Assert.assertEquals(5L, this.channelStateWriter.getLastStartedCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
        assertOutput(addSequence(this.inputGate, createCancellationBarrier(6L, 1), createCancellationBarrier(6L, 2), createBarrier(6L, 0), createBuffer(0), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)));
        Assert.assertEquals(-1L, this.channelStateWriter.getLastStartedCheckpointId());
        Assert.assertEquals(6L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        assertInflightData(new BufferOrEvent[0]);
    }

    @Test
    public void testProcessCancellationBarrierAfterProcessBarrier() throws Exception {
        ValidatingCheckpointInvokable validatingCheckpointInvokable = new ValidatingCheckpointInvokable();
        SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler(TestSubtaskCheckpointCoordinator.INSTANCE, "test", validatingCheckpointInvokable, SystemClock.getInstance(), true, new CheckpointableInput[]{new SingleInputGateBuilder().setNumberOfChannels(2).setChannelFactory((v0, v1) -> {
            return v0.buildLocalChannel(v1);
        }).build()});
        createUnalignedCheckpointBarrierHandler.processBarrier(buildCheckpointBarrier(DEFAULT_CHECKPOINT_ID), new InputChannelInfo(0, 0), false);
        Assert.assertTrue(createUnalignedCheckpointBarrierHandler.isCheckpointPending());
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, createUnalignedCheckpointBarrierHandler.getLatestCheckpointId());
        testProcessCancellationBarrier(createUnalignedCheckpointBarrierHandler, validatingCheckpointInvokable);
    }

    @Test
    public void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception {
        ValidatingCheckpointInvokable validatingCheckpointInvokable = new ValidatingCheckpointInvokable();
        SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler(TestSubtaskCheckpointCoordinator.INSTANCE, "test", validatingCheckpointInvokable, SystemClock.getInstance(), true, new CheckpointableInput[]{new SingleInputGateBuilder().setChannelFactory((v0, v1) -> {
            return v0.buildLocalChannel(v1);
        }).build()});
        createUnalignedCheckpointBarrierHandler.processCancellationBarrier(new CancelCheckpointMarker(DEFAULT_CHECKPOINT_ID), new InputChannelInfo(0, 0));
        verifyTriggeredCheckpoint(createUnalignedCheckpointBarrierHandler, validatingCheckpointInvokable, DEFAULT_CHECKPOINT_ID);
        createUnalignedCheckpointBarrierHandler.processBarrier(buildCheckpointBarrier(DEFAULT_CHECKPOINT_ID), new InputChannelInfo(0, 0), false);
        verifyTriggeredCheckpoint(createUnalignedCheckpointBarrierHandler, validatingCheckpointInvokable, DEFAULT_CHECKPOINT_ID);
    }

    private void testProcessCancellationBarrier(SingleCheckpointBarrierHandler singleCheckpointBarrierHandler, ValidatingCheckpointInvokable validatingCheckpointInvokable) throws Exception {
        long j = new Random().nextBoolean() ? DEFAULT_CHECKPOINT_ID : 1L;
        singleCheckpointBarrierHandler.processCancellationBarrier(new CancelCheckpointMarker(j), new InputChannelInfo(0, 0));
        verifyTriggeredCheckpoint(singleCheckpointBarrierHandler, validatingCheckpointInvokable, j);
        long j2 = j + 1;
        singleCheckpointBarrierHandler.processCancellationBarrier(new CancelCheckpointMarker(j2), new InputChannelInfo(0, 0));
        verifyTriggeredCheckpoint(singleCheckpointBarrierHandler, validatingCheckpointInvokable, j2);
    }

    private void verifyTriggeredCheckpoint(SingleCheckpointBarrierHandler singleCheckpointBarrierHandler, ValidatingCheckpointInvokable validatingCheckpointInvokable, long j) {
        Assert.assertFalse(singleCheckpointBarrierHandler.isCheckpointPending());
        Assert.assertEquals(j, singleCheckpointBarrierHandler.getLatestCheckpointId());
        Assert.assertEquals(j, validatingCheckpointInvokable.getAbortedCheckpointId());
    }

    @Test
    public void testEndOfStreamWithPendingCheckpoint() throws Exception {
        ValidatingCheckpointInvokable validatingCheckpointInvokable = new ValidatingCheckpointInvokable();
        SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler = SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler(TestSubtaskCheckpointCoordinator.INSTANCE, "test", validatingCheckpointInvokable, SystemClock.getInstance(), false, new CheckpointableInput[]{new SingleInputGateBuilder().setChannelFactory((v0, v1) -> {
            return v0.buildLocalChannel(v1);
        }).setNumberOfChannels(2).build()});
        createUnalignedCheckpointBarrierHandler.processBarrier(buildCheckpointBarrier(DEFAULT_CHECKPOINT_ID), new InputChannelInfo(0, 0), false);
        Assert.assertTrue(createUnalignedCheckpointBarrierHandler.isCheckpointPending());
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, createUnalignedCheckpointBarrierHandler.getLatestCheckpointId());
        Assert.assertEquals(2L, createUnalignedCheckpointBarrierHandler.getNumOpenChannels());
        createUnalignedCheckpointBarrierHandler.processEndOfPartition(new InputChannelInfo(0, 0));
        Assert.assertFalse(createUnalignedCheckpointBarrierHandler.isCheckpointPending());
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, createUnalignedCheckpointBarrierHandler.getLatestCheckpointId());
        Assert.assertEquals(1L, createUnalignedCheckpointBarrierHandler.getNumOpenChannels());
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, validatingCheckpointInvokable.getAbortedCheckpointId());
    }

    @Test
    public void testTriggerCheckpointsWithEndOfPartition() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(-1L);
        this.inputGate = createInputGate(3, validatingCheckpointHandler);
        BufferOrEvent[] addSequence = addSequence(this.inputGate, createBarrier(1L, 1), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createEndOfPartition(2), createEndOfPartition(0), createEndOfPartition(1));
        assertOutput(addSequence);
        Assert.assertThat(validatingCheckpointHandler.triggeredCheckpoints, Matchers.contains(new Long[]{1L}));
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, validatingCheckpointHandler.getAbortedCheckpointCounter());
        assertInflightData(addSequence[1]);
    }

    @Test
    public void testTriggerCheckpointsAfterReceivedEndOfPartition() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler(-1L);
        this.inputGate = createInputGate(3, validatingCheckpointHandler);
        BufferOrEvent[] addSequence = addSequence(this.inputGate, createEndOfPartition(0), createBarrier(3L, 1), createBuffer(1), createBuffer(2), createEndOfPartition(1), createBarrier(3L, 2));
        assertOutput(addSequence);
        assertInflightData(addSequence[3]);
        Assert.assertThat(validatingCheckpointHandler.triggeredCheckpoints, Matchers.contains(new Long[]{3L}));
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, validatingCheckpointHandler.getAbortedCheckpointCounter());
        assertOutput(addSequence(this.inputGate, createBuffer(2), createBarrier(4L, 2), createEndOfPartition(2)));
        assertInflightData(new BufferOrEvent[0]);
        Assert.assertThat(validatingCheckpointHandler.triggeredCheckpoints, Matchers.contains(new Long[]{3L, 4L}));
        Assert.assertEquals(DEFAULT_CHECKPOINT_ID, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    private BufferOrEvent createBarrier(long j, int i) {
        return createBarrier(j, i, System.currentTimeMillis());
    }

    private BufferOrEvent createBarrier(long j, int i, long j2) {
        this.sizeCounter++;
        return new BufferOrEvent(new CheckpointBarrier(j, j2, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, i));
    }

    private BufferOrEvent createCancellationBarrier(long j, int i) {
        this.sizeCounter++;
        return new BufferOrEvent(new CancelCheckpointMarker(j), new InputChannelInfo(0, i));
    }

    private BufferOrEvent createBuffer(int i, int i2) {
        return new BufferOrEvent(TestBufferFactory.createBuffer(i2), new InputChannelInfo(0, i));
    }

    private BufferOrEvent createBuffer(int i) {
        int i2 = this.sizeCounter;
        this.sizeCounter = i2 + 1;
        return new BufferOrEvent(TestBufferFactory.createBuffer(i2), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createEndOfPartition(int i) {
        return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, i));
    }

    private CheckpointedInputGate createInputGate(int i, AbstractInvokable abstractInvokable) throws IOException {
        return createInputGate(i, abstractInvokable, true);
    }

    private CheckpointedInputGate createInputGate(int i, AbstractInvokable abstractInvokable, boolean z) throws IOException {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate build2 = new SingleInputGateBuilder().setNumberOfChannels(i).setupBufferPoolFactory(build).build();
        build2.setInputChannels((InputChannel[]) IntStream.range(0, i).mapToObj(i2 -> {
            return InputChannelBuilder.newBuilder().setChannelIndex(i2).setStateWriter(this.channelStateWriter).setupFromNettyShuffleEnvironment(build).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(build2);
        }).toArray(i3 -> {
            return new RemoteInputChannel[i3];
        }));
        this.sequenceNumbers = new int[i];
        build2.setup();
        build2.requestPartitions();
        return createCheckpointedInputGate(build2, abstractInvokable, z);
    }

    private BufferOrEvent[] addSequence(CheckpointedInputGate checkpointedInputGate, BufferOrEvent... bufferOrEventArr) throws Exception {
        this.output = new ArrayList();
        addSequence(checkpointedInputGate, this.output, this.sequenceNumbers, bufferOrEventArr);
        this.sizeCounter = 1;
        return bufferOrEventArr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BufferOrEvent[] addSequence(CheckpointedInputGate checkpointedInputGate, List<BufferOrEvent> list, int[] iArr, BufferOrEvent... bufferOrEventArr) throws Exception {
        Optional pollNext;
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            if (bufferOrEvent.isEvent()) {
                bufferOrEvent = new BufferOrEvent(EventSerializer.toBuffer(bufferOrEvent.getEvent(), bufferOrEvent.getEvent() instanceof CheckpointBarrier), bufferOrEvent.getChannelInfo(), bufferOrEvent.moreAvailable(), bufferOrEvent.morePriorityEvents());
            }
            RemoteInputChannel channel = checkpointedInputGate.getChannel(bufferOrEvent.getChannelInfo().getInputChannelIdx());
            Buffer buffer = bufferOrEvent.getBuffer();
            int inputChannelIdx = bufferOrEvent.getChannelInfo().getInputChannelIdx();
            int i = iArr[inputChannelIdx];
            iArr[inputChannelIdx] = i + 1;
            channel.onBuffer(buffer, i, 0, 0);
            do {
                pollNext = checkpointedInputGate.pollNext();
                list.getClass();
            } while (pollNext.map((v1) -> {
                return r1.add(v1);
            }).isPresent());
        }
        return bufferOrEventArr;
    }

    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate indexedInputGate, AbstractInvokable abstractInvokable) {
        return createCheckpointedInputGate(indexedInputGate, abstractInvokable, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate indexedInputGate, AbstractInvokable abstractInvokable, boolean z) {
        return new CheckpointedInputGate(indexedInputGate, SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler(new TestSubtaskCheckpointCoordinator(this.channelStateWriter), "Test", abstractInvokable, SystemClock.getInstance(), z, new CheckpointableInput[]{indexedInputGate}), new SyncMailboxExecutor());
    }

    private void assertInflightData(BufferOrEvent... bufferOrEventArr) {
        Collection<BufferOrEvent> andResetInflightData = getAndResetInflightData();
        Assert.assertEquals("Unexpected in-flight sequence: " + andResetInflightData, getIds(Arrays.asList(bufferOrEventArr)), getIds(andResetInflightData));
    }

    private Collection<BufferOrEvent> getAndResetInflightData() {
        List list = (List) this.channelStateWriter.getAddedInput().entries().stream().map(entry -> {
            return new BufferOrEvent((Buffer) entry.getValue(), (InputChannelInfo) entry.getKey());
        }).collect(Collectors.toList());
        this.channelStateWriter.reset();
        return list;
    }

    private void assertOutput(BufferOrEvent... bufferOrEventArr) {
        Assert.assertEquals("Unexpected output sequence", getIds(Arrays.asList(bufferOrEventArr)), getIds(this.output));
    }

    private List<Object> getIds(Collection<BufferOrEvent> collection) {
        return (List) collection.stream().filter(bufferOrEvent -> {
            return (bufferOrEvent.isEvent() && ((bufferOrEvent.getEvent() instanceof CheckpointBarrier) || (bufferOrEvent.getEvent() instanceof CancelCheckpointMarker))) ? false : true;
        }).map(bufferOrEvent2 -> {
            return bufferOrEvent2.isBuffer() ? Integer.valueOf(bufferOrEvent2.getSize() - 1) : bufferOrEvent2.getEvent();
        }).collect(Collectors.toList());
    }

    private CheckpointBarrier buildCheckpointBarrier(long j) {
        return new CheckpointBarrier(j, DEFAULT_CHECKPOINT_ID, CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }
}
