/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import junit.framework.TestCase;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.RuntimeEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.TestBarrierHandlerFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler;
import org.apache.flink.streaming.util.TestCheckpointedInputGateBuilder;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class AlternatingCheckpointsTest {
    private final ClockWithDelayedActions clock = new ClockWithDelayedActions();

    private TestBarrierHandlerFactory getTestBarrierHandlerFactory(ValidatingCheckpointHandler target) {
        return TestBarrierHandlerFactory.forTarget(target).withActionRegistration(this.clock).withClock(this.clock);
    }

    @Test
    public void testChannelResetOnNewBarrier() throws Exception {
        RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(2, this.getTestBarrierHandlerFactory(new ValidatingCheckpointHandler())).withChannelStateWriter((ChannelStateWriter)stateWriter).withRemoteChannels().withMailboxExecutor().build();){
            this.sendBarrier(0L, this.clock.relativeTimeMillis(), (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), gate, 0);
            ((RemoteInputChannel)gate.getChannel(0)).onBuffer(TestBufferFactory.createBuffer((int)1024), 1, 0, 0);
            this.send(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), (boolean)true), 1, gate);
            Assert.assertFalse((boolean)stateWriter.getAddedInput().isEmpty());
        }
    }

    @Test
    public void testSwitchToUnalignedByUpstream() throws Exception {
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(2, this.getTestBarrierHandlerFactory(target)).build();){
            CheckpointBarrier aligned = new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE));
            this.send(EventSerializer.toBuffer((AbstractEvent)new EventAnnouncement((AbstractEvent)aligned, 0), (boolean)true), 0, gate);
            Assert.assertEquals((long)0L, (long)target.triggeredCheckpointCounter);
            this.send(EventSerializer.toBuffer((AbstractEvent)aligned.asUnaligned(), (boolean)true), 1, gate);
            Assert.assertEquals((long)1L, (long)target.triggeredCheckpointCounter);
        }
    }

    @Test
    public void testCheckpointHandling() throws Exception {
        this.testBarrierHandling((SnapshotType)CheckpointType.CHECKPOINT);
    }

    @Test
    public void testSavepointHandling() throws Exception {
        this.testBarrierHandling((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL));
    }

    @Test
    public void testAlternation() throws Exception {
        int numBarriers = 123;
        int numChannels = 123;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).build();){
            ArrayList<Long> barriers = new ArrayList<Long>();
            for (long barrier = 0L; barrier < (long)numBarriers; ++barrier) {
                barriers.add(barrier);
                CheckpointType type = barrier % 2L == 0L ? CheckpointType.CHECKPOINT : SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL);
                for (int channel = 0; channel < numChannels; ++channel) {
                    this.send(this.barrier(barrier, this.clock.relativeTimeMillis(), CheckpointOptions.alignedNoTimeout((SnapshotType)type, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())).retainBuffer(), channel, gate);
                }
            }
            Assert.assertEquals(barriers, target.triggeredCheckpoints);
        }
    }

    @Test
    public void testAlignedAfterTimedOut() throws Exception {
        int numChannels = 1;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        long alignedCheckpointTimeout = 100L;
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            Buffer barrier1 = this.barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout));
            ((RemoteInputChannel)gate.getChannel(0)).onBuffer(barrier1.retainBuffer(), 0, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            Buffer barrier2 = this.barrier(2L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout));
            ((RemoteInputChannel)gate.getChannel(0)).onBuffer(barrier2.retainBuffer(), 1, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)2L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout)}));
        }
    }

    @Test
    public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).build();){
            Buffer neverTimeoutableCheckpoint = this.withTimeout(Integer.MAX_VALUE);
            this.send(neverTimeoutableCheckpoint, 0, gate);
            this.sendData(1000, 1, gate);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            this.send(neverTimeoutableCheckpoint, 1, gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        }
    }

    @Test
    public void testTimeoutAlignment() throws Exception {
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(2, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            this.testTimeoutBarrierOnTwoChannels(target, gate, 10L);
        }
    }

    @Test
    public void testTimeoutAlignmentAfterProcessingBarrier() throws Exception {
        int numChannels = 3;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            this.send(this.barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE)), 2, gate);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            this.testTimeoutBarrierOnTwoChannels(target, gate, Integer.MAX_VALUE);
        }
    }

    private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, CheckpointedInputGate gate, long alignedCheckpointTimeout) throws Exception {
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 0, 0, 0);
        this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 1, 0, 0);
        this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0, 0);
        this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
        this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
        Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        this.clock.advanceTime(alignedCheckpointTimeout * 2L, TimeUnit.MILLISECONDS);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
        AlternatingCheckpointsTest.assertData(gate);
        AlternatingCheckpointsTest.assertData(gate);
        AlternatingCheckpointsTest.assertData(gate);
    }

    private Buffer dataBuffer() {
        return TestBufferFactory.createBuffer((int)100).retainBuffer();
    }

    @Test
    public void testTimeoutAlignmentOnFirstBarrier() throws Exception {
        int i;
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        for (i = 0; i < numChannels; ++i) {
            this.getChannel(gate, i).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        }
        Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
        for (i = 0; i < numChannels; ++i) {
            AlternatingCheckpointsTest.assertAnnouncement(gate);
        }
        Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
        this.clock.advanceTime(alignedCheckpointTimeout * 4L, TimeUnit.MILLISECONDS);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
    }

    @Test
    public void testTimeoutAlignmentBeforeFirstBarrier() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withTestChannels().withMailboxExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(1, alignedCheckpointTimeout);
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        ((TestInputChannel)gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
    }

    @Test
    public void testTimeoutAlignmentWhenLocalBarrierFirst() throws Exception {
        int numChannels = 3;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withMixedChannels(0).withMailboxExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(1, alignedCheckpointTimeout);
        ((TestInputChannel)gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
        this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        this.getChannel(gate, 2).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        checkpointBarrier = this.withTimeout(2, alignedCheckpointTimeout);
        ((TestInputChannel)gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)2L, (long)target.getTriggeredCheckpointCounter());
        List<CheckpointOptions> checkpointOptions = target.getTriggeredCheckpointOptions();
        Assert.assertEquals((Object)CheckpointOptions.AlignmentType.UNALIGNED, (Object)checkpointOptions.get(checkpointOptions.size() - 1).getAlignment());
    }

    @Test
    public void testActiveTimeoutAfterLocalBarrierPassiveTimeout() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withMixedChannels(0).withMailboxExecutor().build();){
            long alignedCheckpointTimeout = 10L;
            Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
            ((TestInputChannel)gate.getChannel(0)).read(checkpointBarrier.retainBuffer());
            ((TestInputChannel)gate.getChannel(0)).read(this.dataBuffer());
            AlternatingCheckpointsTest.assertBarrier(gate);
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            this.clock.advanceTimeWithoutRunningCallables(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.executeCallables();
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
        }
    }

    @Test
    public void testTimeoutAlignmentOnAnnouncementForSecondCheckpoint() throws Exception {
        int i;
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();
        long alignedCheckpointTimeout = 100L;
        this.performFirstCheckpoint(numChannels, target, gate, alignedCheckpointTimeout);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        Buffer checkpointBarrier = this.withTimeout(2, alignedCheckpointTimeout);
        for (i = 0; i < numChannels; ++i) {
            this.getChannel(gate, i).onBuffer(this.dataBuffer(), 1, 0, 0);
            this.getChannel(gate, i).onBuffer(checkpointBarrier.retainBuffer(), 2, 0, 0);
        }
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        for (i = 0; i < numChannels; ++i) {
            AlternatingCheckpointsTest.assertAnnouncement(gate);
        }
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
        this.clock.advanceTime(alignedCheckpointTimeout * 4L, TimeUnit.MILLISECONDS);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)2L, (long)target.getTriggeredCheckpointCounter());
    }

    private void performFirstCheckpoint(int numChannels, ValidatingCheckpointHandler target, CheckpointedInputGate gate, long alignedCheckpointTimeout) throws IOException, InterruptedException {
        int i;
        Buffer checkpointBarrier = this.withTimeout(1, alignedCheckpointTimeout);
        for (i = 0; i < numChannels; ++i) {
            this.getChannel(gate, i).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        }
        Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
        for (i = 0; i < numChannels; ++i) {
            AlternatingCheckpointsTest.assertAnnouncement(gate);
        }
        for (i = 0; i < numChannels; ++i) {
            AlternatingCheckpointsTest.assertBarrier(gate);
        }
    }

    @Test
    public void testPassiveTimeoutAlignmentOnAnnouncement() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        this.clock.advanceTimeWithoutRunningCallables(alignedCheckpointTimeout * 4L, TimeUnit.MILLISECONDS);
        this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 0, 0, 0);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
    }

    @Test
    public void testActiveTimeoutAlignmentOnFirstBarrier() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.send(checkpointBarrier, 0, gate);
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
    }

    @Test
    public void testAllChannelsUnblockedAfteralignedCheckpointTimeout() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, this.getTestBarrierHandlerFactory(target)).withTestChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 100L;
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout));
        Buffer checkpointBarrierBuffer = EventSerializer.toBuffer((AbstractEvent)checkpointBarrier, (boolean)false);
        this.send(EventSerializer.toBuffer((AbstractEvent)new EventAnnouncement((AbstractEvent)checkpointBarrier, 0), (boolean)true), 0, gate);
        ((TestInputChannel)gate.getChannel(0)).setBlocked(true);
        this.send(checkpointBarrierBuffer, 0, gate);
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        this.send(EventSerializer.toBuffer((AbstractEvent)new EventAnnouncement((AbstractEvent)checkpointBarrier, 0), (boolean)true), 1, gate);
        ((TestInputChannel)gate.getChannel(1)).setBlocked(true);
        this.send(checkpointBarrierBuffer, 1, gate);
        MatcherAssert.assertThat((Object)target.getTriggeredCheckpointOptions().size(), (Matcher)Matchers.equalTo((Object)1));
        MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
        Assert.assertFalse((boolean)((TestInputChannel)gate.getChannel(0)).isBlocked());
        Assert.assertFalse((boolean)((TestInputChannel)gate.getChannel(1)).isBlocked());
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterLastBarrier() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, this.getTestBarrierHandlerFactory(target)).withTestChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.send(checkpointBarrier, 0, gate);
        this.send(checkpointBarrier, 1, gate);
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)CoreMatchers.not((Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())})));
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterAbort() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, this.getTestBarrierHandlerFactory(target)).withTestChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.send(checkpointBarrier, 0, gate);
        this.send(EventSerializer.toBuffer((AbstractEvent)new CancelCheckpointMarker(1L), (boolean)true), 0, gate);
        this.send(EventSerializer.toBuffer((AbstractEvent)new CancelCheckpointMarker(1L), (boolean)true), 1, gate);
        this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)target.getTriggeredCheckpointOptions().size(), (Matcher)Matchers.equalTo((Object)0));
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterClose() throws Exception {
        int numberOfChannels = 2;
        ClockWithDelayedActions clockWithDelayedActions = new ClockWithDelayedActions(){

            @Override
            public BarrierAlignmentUtil.Cancellable registerTask(Callable<?> callable, Duration delay) {
                super.registerTask(callable, delay);
                return () -> {};
            }
        };
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, TestBarrierHandlerFactory.forTarget(target).withActionRegistration(clockWithDelayedActions).withClock(clockWithDelayedActions)).withRemoteChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 100L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.send(checkpointBarrier, 0, gate);
        gate.close();
        clockWithDelayedActions.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)target.getTriggeredCheckpointOptions().size(), (Matcher)Matchers.equalTo((Object)0));
    }

    @Test
    public void testActiveTimeoutAlignmentOnAnnouncement() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            long alignedCheckpointTimeout = 10L;
            Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 1, 0, 0);
            this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0, 0);
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.advanceTime(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertBarrier(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
        }
    }

    @Test
    public void testActiveTimeoutAfterAnnouncementPassiveTimeout() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            long alignedCheckpointTimeout = 10L;
            Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 1, 0, 0);
            this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0, 0);
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.advanceTimeWithoutRunningCallables(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.executeCallables();
            AlternatingCheckpointsTest.assertBarrier(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
        }
    }

    @Test
    public void testActiveTimeoutBeforeFirstAnnouncementPassiveTimeout() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            long alignmentCheckpointTimeout = 10L;
            Buffer checkpointBarrier = this.withTimeout(alignmentCheckpointTimeout);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            this.clock.advanceTimeWithoutRunningCallables(alignmentCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            this.clock.executeCallables();
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
        }
    }

    @Test
    public void testActiveTimeoutAfterBarrierPassiveTimeout() throws Exception {
        int numChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            long alignedCheckpointTimeout = 10L;
            Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 0).onBuffer(this.dataBuffer(), 1, 0, 0);
            this.getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0, 0);
            Assert.assertEquals((long)0L, (long)target.getTriggeredCheckpointCounter());
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertData(gate);
            this.clock.advanceTimeWithoutRunningCallables(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            AlternatingCheckpointsTest.assertBarrier(gate);
            this.clock.executeCallables();
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
            AlternatingCheckpointsTest.assertData(gate);
        }
    }

    @Test
    public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception {
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(3, this.getTestBarrierHandlerFactory(target)).withChannelStateWriter((ChannelStateWriter)channelStateWriter).withRemoteChannels().withMailboxExecutor().build();
        this.getChannel(gate, 0).onBuffer(this.withTimeout(Integer.MAX_VALUE).retainBuffer(), 0, 0, 0);
        AlternatingCheckpointsTest.assertAnnouncement(gate);
        AlternatingCheckpointsTest.assertBarrier(gate);
        this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
        this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 1, 0, 0);
        this.getChannel(gate, 1).onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), (boolean)true).retainBuffer(), 2, 0, 0);
        AlternatingCheckpointsTest.assertBarrier(gate);
        Assert.assertEquals((long)2L, (long)channelStateWriter.getAddedInput().get((Object)this.getChannel(gate, 1).getChannelInfo()).size());
        Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
    }

    @Test
    public void testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
        int numChannels = 3;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        long alignedCheckpointTimeout = 100L;
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            this.getChannel(gate, 0).onBuffer(this.barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout)), 0, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            this.clock.advanceTimeWithoutRunningCallables(alignedCheckpointTimeout + 1L, TimeUnit.MILLISECONDS);
            this.getChannel(gate, 1).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(this.endOfPartition(), 1, 0, 0);
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertEvent(gate, EndOfPartitionEvent.class);
            this.getChannel(gate, 2).onBuffer(this.dataBuffer(), 0, 0, 0);
            this.getChannel(gate, 2).onBuffer(this.endOfPartition(), 1, 0, 0);
            AlternatingCheckpointsTest.assertData(gate);
            AlternatingCheckpointsTest.assertEvent(gate, EndOfPartitionEvent.class);
            Assert.assertEquals((long)1L, (long)target.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(target.getTriggeredCheckpointOptions(), (Matcher)Matchers.contains((Object[])new CheckpointOptions[]{CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())}));
        }
    }

    @Test
    public void testStartNewCheckpointViaAnnouncement() throws Exception {
        int numChannels = 3;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        long alignedCheckpointTimeout = 10000L;
        try (CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).withRemoteChannels().withMailboxExecutor().build();){
            this.getChannel(gate, 0).onBuffer(this.barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout)), 0, 0, 0);
            this.getChannel(gate, 1).onBuffer(this.endOfPartition(), 0, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertEvent(gate, EndOfPartitionEvent.class);
            TestCase.assertTrue((boolean)gate.getCheckpointBarrierHandler().isDuringAlignment());
            AlternatingCheckpointsTest.assertBarrier(gate);
            this.getChannel(gate, 2).onBuffer(this.barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout)), 0, 0, 0);
            AlternatingCheckpointsTest.assertAnnouncement(gate);
            AlternatingCheckpointsTest.assertBarrier(gate);
            MatcherAssert.assertThat(target.triggeredCheckpoints, (Matcher)Matchers.contains((Object[])new Long[]{1L}));
        }
    }

    private RemoteInputChannel getChannel(CheckpointedInputGate gate, int channelIndex) {
        return (RemoteInputChannel)gate.getChannel(channelIndex);
    }

    @Test
    public void testMetricsAlternation() throws Exception {
        int numChannels = 2;
        int bufferSize = 1000;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).build();
        long startNanos = this.clock.relativeTimeNanos();
        long checkpoint1CreationTime = this.clock.relativeTimeMillis() - 10L;
        this.sendBarrier(1L, checkpoint1CreationTime, (SnapshotType)CheckpointType.CHECKPOINT, gate, 0);
        this.sendData(bufferSize, 0, gate);
        this.sendData(bufferSize, 1, gate);
        this.clock.advanceTime(6L, TimeUnit.MILLISECONDS);
        this.sendBarrier(1L, checkpoint1CreationTime, (SnapshotType)CheckpointType.CHECKPOINT, gate, 1);
        this.sendData(bufferSize, 0, gate);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 1L, startNanos, 6000000L, 10000000L, bufferSize * 2);
        startNanos = this.clock.relativeTimeNanos();
        long checkpoint2CreationTime = this.clock.relativeTimeMillis() - 5L;
        this.sendBarrier(2L, checkpoint2CreationTime, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), gate, 0);
        this.sendData(bufferSize, 1, gate);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 2L, startNanos, 0L, 5000000L, bufferSize * 2);
        this.clock.advanceTime(5L, TimeUnit.MILLISECONDS);
        this.sendBarrier(2L, checkpoint2CreationTime, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), gate, 1);
        this.sendData(bufferSize, 0, gate);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 2L, startNanos, 5000000L, 5000000L, bufferSize);
        startNanos = this.clock.relativeTimeNanos();
        long checkpoint3CreationTime = this.clock.relativeTimeMillis() - 7L;
        this.send(this.barrier(3L, checkpoint3CreationTime, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), 0, gate);
        this.sendData(bufferSize, 0, gate);
        this.sendData(bufferSize, 1, gate);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 3L, startNanos, 0L, 7000000L, -1L);
        this.clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        this.send(this.barrier(3L, checkpoint2CreationTime, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), 1, gate);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 3L, startNanos, 10000000L, 7000000L, bufferSize * 2);
    }

    @Test
    public void testMetricsSingleChannel() throws Exception {
        int numChannels = 1;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numChannels, this.getTestBarrierHandlerFactory(target)).build();
        long checkpoint1CreationTime = this.clock.relativeTimeMillis() - 10L;
        long startNanos = this.clock.relativeTimeNanos();
        this.sendData(1000, 0, gate);
        this.sendBarrier(1L, checkpoint1CreationTime, (SnapshotType)CheckpointType.CHECKPOINT, gate, 0);
        this.sendData(1000, 0, gate);
        this.clock.advanceTime(6L, TimeUnit.MILLISECONDS);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 1L, startNanos, 0L, 10000000L, 0L);
        long checkpoint2CreationTime = this.clock.relativeTimeMillis() - 5L;
        startNanos = this.clock.relativeTimeNanos();
        this.sendData(1000, 0, gate);
        this.sendBarrier(2L, checkpoint2CreationTime, (SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), gate, 0);
        this.sendData(1000, 0, gate);
        this.clock.advanceTime(5L, TimeUnit.MILLISECONDS);
        this.assertMetrics(target, gate.getCheckpointBarrierHandler(), 2L, startNanos, 0L, 5000000L, 0L);
    }

    private void assertMetrics(ValidatingCheckpointHandler target, CheckpointBarrierHandler checkpointBarrierHandler, long latestCheckpointId, long alignmentDurationStartNanos, long alignmentDurationNanosMin, long startDelayNanos, long bytesProcessedDuringAlignment) {
        MatcherAssert.assertThat((Object)checkpointBarrierHandler.getLatestCheckpointId(), (Matcher)Matchers.equalTo((Object)latestCheckpointId));
        long alignmentDurationNanos = checkpointBarrierHandler.getAlignmentDurationNanos();
        long expectedAlignmentDurationNanosMax = this.clock.relativeTimeNanos() - alignmentDurationStartNanos;
        MatcherAssert.assertThat((Object)alignmentDurationNanos, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(alignmentDurationNanosMin)));
        MatcherAssert.assertThat((Object)alignmentDurationNanos, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(expectedAlignmentDurationNanosMax)));
        MatcherAssert.assertThat((Object)checkpointBarrierHandler.getCheckpointStartDelayNanos(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(startDelayNanos)));
        MatcherAssert.assertThat((Object)FutureUtils.getOrDefault(target.getLastBytesProcessedDuringAlignment(), (Object)-1L), (Matcher)Matchers.equalTo((Object)bytesProcessedDuringAlignment));
    }

    @Test
    public void testPreviousHandlerReset() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel[] channels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannels((InputChannel[])channels);
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        SingleCheckpointBarrierHandler barrierHandler = this.getTestBarrierHandlerFactory(target).create(inputGate);
        for (int i = 0; i < 4; ++i) {
            int channel = i % 2;
            CheckpointType type = channel == 0 ? SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL) : CheckpointType.CHECKPOINT;
            target.setNextExpectedCheckpointId(-1L);
            if (type.isSavepoint()) {
                channels[channel].setBlocked(true);
            }
            barrierHandler.processBarrier(new CheckpointBarrier((long)i, this.clock.relativeTimeMillis(), new CheckpointOptions((SnapshotType)type, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, channel), false);
            if (type.isSavepoint()) {
                TestCase.assertTrue((boolean)channels[channel].isBlocked());
                Assert.assertFalse((boolean)channels[(channel + 1) % 2].isBlocked());
            } else {
                Assert.assertFalse((boolean)channels[0].isBlocked());
                Assert.assertFalse((boolean)channels[1].isBlocked());
            }
            TestCase.assertTrue((boolean)barrierHandler.isCheckpointPending());
            Assert.assertFalse((boolean)barrierHandler.getAllBarriersReceivedFuture((long)i).isDone());
            channels[0].setBlocked(false);
            channels[1].setBlocked(false);
        }
    }

    @Test
    public void testHasInflightDataBeforeProcessBarrier() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        inputGate.setInputChannels(new InputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)});
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        SingleCheckpointBarrierHandler barrierHandler = this.getTestBarrierHandlerFactory(target).create(inputGate);
        long id = 1L;
        barrierHandler.processBarrier(new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0), false);
        Assert.assertFalse((boolean)barrierHandler.getAllBarriersReceivedFuture(1L).isDone());
    }

    @Test
    public void testOutOfOrderBarrier() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel firstChannel = new TestInputChannel(inputGate, 0);
        TestInputChannel secondChannel = new TestInputChannel(inputGate, 1);
        inputGate.setInputChannels(new InputChannel[]{firstChannel, secondChannel});
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        SingleCheckpointBarrierHandler barrierHandler = this.getTestBarrierHandlerFactory(target).create(inputGate);
        long checkpointId = 10L;
        long outOfOrderSavepointId = 5L;
        barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, this.clock.relativeTimeMillis(), new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0), false);
        secondChannel.setBlocked(true);
        barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, this.clock.relativeTimeMillis(), new CheckpointOptions((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1), false);
        Assert.assertEquals((long)checkpointId, (long)barrierHandler.getLatestCheckpointId());
        Assert.assertFalse((boolean)secondChannel.isBlocked());
    }

    @Test
    public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() throws Exception {
        int numberOfChannels = 2;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        CheckpointedInputGate gate = new TestCheckpointedInputGateBuilder(numberOfChannels, this.getTestBarrierHandlerFactory(target)).withTestChannels().withSyncExecutor().build();
        long alignedCheckpointTimeout = 10000L;
        Buffer checkpointBarrier = this.withTimeout(alignedCheckpointTimeout);
        this.send(checkpointBarrier, 0, gate);
        this.clock.advanceTime(Duration.ofSeconds(1L));
        this.send(this.withTimeout(2, alignedCheckpointTimeout), 0, gate);
        this.clock.advanceTime(Duration.ofSeconds(1L));
        this.send(EventSerializer.toBuffer((AbstractEvent)new CancelCheckpointMarker(1L), (boolean)true), 1, gate);
        this.clock.advanceTime(Duration.ofSeconds(1L));
        this.send(this.withTimeout(2, alignedCheckpointTimeout), 1, gate);
        this.clock.advanceTime(Duration.ofSeconds(1L));
        Assert.assertEquals((long)Duration.ofSeconds(2L).toNanos(), (long)target.lastAlignmentDurationNanos.get());
    }

    private void testBarrierHandling(SnapshotType checkpointType) throws Exception {
        long barrierId = 123L;
        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel fast = new TestInputChannel(gate, 0, false, true);
        TestInputChannel slow = new TestInputChannel(gate, 1, false, true);
        gate.setInputChannels(new InputChannel[]{fast, slow});
        SingleCheckpointBarrierHandler barrierHandler = this.getTestBarrierHandlerFactory(target).create(gate);
        CheckpointedInputGate checkpointedGate = new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)barrierHandler, (MailboxExecutor)new SyncMailboxExecutor());
        if (checkpointType.isSavepoint()) {
            fast.setBlocked(true);
            slow.setBlocked(true);
        }
        CheckpointOptions options = checkpointType.isSavepoint() ? CheckpointOptions.alignedNoTimeout((SnapshotType)checkpointType, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()) : CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
        Buffer barrier = this.barrier(123L, 1L, options);
        this.send(barrier.retainBuffer(), (InputChannel)fast, checkpointedGate);
        Assert.assertEquals((Object)checkpointType.isSavepoint(), (Object)target.triggeredCheckpoints.isEmpty());
        this.send(barrier.retainBuffer(), (InputChannel)slow, checkpointedGate);
        Assert.assertEquals(Collections.singletonList(123L), target.triggeredCheckpoints);
        if (checkpointType.isSavepoint()) {
            for (InputChannel channel : gate.inputChannels()) {
                Assert.assertFalse((String)String.format("channel %d should be resumed", channel.getChannelIndex()), (boolean)((TestInputChannel)channel).isBlocked());
            }
        }
    }

    private void sendBarrier(long barrierId, long barrierCreationTime, SnapshotType type, CheckpointedInputGate gate, int channelId) throws Exception {
        this.send(this.barrier(barrierId, barrierCreationTime, CheckpointOptions.alignedNoTimeout((SnapshotType)type, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())), channelId, gate);
    }

    private void sendData(int dataSize, int channelId, CheckpointedInputGate gate) throws Exception {
        this.send(TestBufferFactory.createBuffer((int)dataSize), channelId, gate);
    }

    private void send(Buffer buffer, int channelId, CheckpointedInputGate gate) throws Exception {
        this.send(buffer.retainBuffer(), gate.getChannel(channelId), gate);
    }

    private void send(Buffer buffer, InputChannel channel, CheckpointedInputGate checkpointedGate) throws IOException, InterruptedException {
        if (channel instanceof TestInputChannel) {
            ((TestInputChannel)channel).read(buffer, buffer.getDataType());
        } else if (channel instanceof RemoteInputChannel) {
            ((RemoteInputChannel)channel).onBuffer(buffer, 0, 0, 0);
        } else {
            throw new IllegalArgumentException("Unknown channel type: " + channel);
        }
        while (checkpointedGate.pollNext().isPresent()) {
        }
    }

    private Buffer withTimeout(long alignedCheckpointTimeout) throws IOException {
        return this.withTimeout(1, alignedCheckpointTimeout);
    }

    private Buffer withTimeout(int checkpointId, long alignedCheckpointTimeout) throws IOException {
        return this.barrier(checkpointId, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)alignedCheckpointTimeout));
    }

    private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options) throws IOException {
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(barrierId, barrierTimestamp, options);
        return EventSerializer.toBuffer((AbstractEvent)checkpointBarrier, (boolean)checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
    }

    private Buffer endOfPartition() throws IOException {
        return EventSerializer.toBuffer((AbstractEvent)EndOfPartitionEvent.INSTANCE, (boolean)false);
    }

    private static void assertAnnouncement(CheckpointedInputGate gate) throws IOException, InterruptedException {
        AlternatingCheckpointsTest.assertEvent(gate, EventAnnouncement.class);
    }

    private static void assertBarrier(CheckpointedInputGate gate) throws IOException, InterruptedException {
        AlternatingCheckpointsTest.assertEvent(gate, CheckpointBarrier.class);
    }

    private static <T extends RuntimeEvent> void assertEvent(CheckpointedInputGate gate, Class<T> clazz) throws IOException, InterruptedException {
        Optional<BufferOrEvent> bufferOrEvent = AlternatingCheckpointsTest.assertPoll(gate);
        TestCase.assertTrue((String)("expected event, got data buffer on " + bufferOrEvent.get().getChannelInfo()), (boolean)bufferOrEvent.get().isEvent());
        Assert.assertEquals(clazz, bufferOrEvent.get().getEvent().getClass());
    }

    private static <T extends RuntimeEvent> void assertData(CheckpointedInputGate gate) throws IOException, InterruptedException {
        Optional<BufferOrEvent> bufferOrEvent = AlternatingCheckpointsTest.assertPoll(gate);
        TestCase.assertTrue((String)("expected data, got " + bufferOrEvent.get().getEvent() + "  on " + bufferOrEvent.get().getChannelInfo()), (boolean)bufferOrEvent.get().isBuffer());
    }

    private static Optional<BufferOrEvent> assertPoll(CheckpointedInputGate gate) throws IOException, InterruptedException {
        Optional bufferOrEvent = gate.pollNext();
        TestCase.assertTrue((String)"empty gate", (boolean)bufferOrEvent.isPresent());
        return bufferOrEvent;
    }

    private static class ClockWithDelayedActions
    extends Clock
    implements BarrierAlignmentUtil.DelayableTimer {
        private final ManualClock clock = new ManualClock(100000000L);
        private final PriorityQueue<CallableWithTimestamp> queue = new PriorityQueue<CallableWithTimestamp>(Comparator.comparingLong(CallableWithTimestamp::getTimestamp));

        private ClockWithDelayedActions() {
        }

        public BarrierAlignmentUtil.Cancellable registerTask(Callable<?> callable, Duration delay) {
            CallableWithTimestamp callableWithTimestamp = new CallableWithTimestamp(this.clock.relativeTimeNanos() + delay.toNanos(), callable);
            this.queue.add(callableWithTimestamp);
            return () -> this.queue.remove(callableWithTimestamp);
        }

        public void advanceTime(long duration, TimeUnit timeUnit) throws Exception {
            this.clock.advanceTime(duration, timeUnit);
            this.executeCallables();
        }

        public void advanceTime(Duration duration) throws Exception {
            this.clock.advanceTime(duration);
            this.executeCallables();
        }

        public ManualClock getClock() {
            return this.clock;
        }

        public void advanceTimeWithoutRunningCallables(long duration, TimeUnit timeUnit) {
            this.clock.advanceTime(duration, timeUnit);
        }

        public void executeCallables() throws Exception {
            long currentTimestamp = this.clock.relativeTimeNanos();
            while (!this.queue.isEmpty() && this.queue.peek().getTimestamp() <= currentTimestamp) {
                this.queue.poll().getCallable().call();
            }
        }

        public long absoluteTimeMillis() {
            return this.clock.absoluteTimeMillis();
        }

        public long relativeTimeMillis() {
            return this.clock.relativeTimeMillis();
        }

        public long relativeTimeNanos() {
            return this.clock.relativeTimeNanos();
        }
    }

    private static class CallableWithTimestamp {
        private final long timestamp;
        private final Callable<?> callable;

        private CallableWithTimestamp(long timestamp, @Nonnull Callable<?> callable) {
            this.timestamp = timestamp;
            this.callable = callable;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public Callable<?> getCallable() {
            return this.callable;
        }
    }
}

