/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ChannelStatePersisterTest {
    ChannelStatePersisterTest() {
    }

    @Test
    void testNewBarrierNotOverwrittenByStopPersisting() throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
        ChannelStatePersister persister = new ChannelStatePersister((ChannelStateWriter)channelStateWriter, channelInfo);
        long checkpointId = 1L;
        channelStateWriter.start(checkpointId, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        persister.startPersisting(checkpointId, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        Assertions.assertThat((List)channelStateWriter.getAddedInput().get((Object)channelInfo)).hasSize(1);
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assertions.assertThat((List)channelStateWriter.getAddedInput().get((Object)channelInfo)).hasSize(1);
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId + 1L));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        persister.stopPersisting(checkpointId);
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assertions.assertThat((List)channelStateWriter.getAddedInput().get((Object)channelInfo)).hasSize(1);
        Assertions.assertThat((boolean)persister.hasBarrierReceived()).isTrue();
    }

    @Test
    void testNewBarrierNotOverwrittenByCheckForBarrier() throws Exception {
        ChannelStatePersister persister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        persister.startPersisting(1L, Collections.emptyList());
        persister.startPersisting(2L, Collections.emptyList());
        Assertions.assertThat((OptionalLong)persister.checkForBarrier(ChannelStatePersisterTest.barrier(1L))).isNotPresent();
        Assertions.assertThat((boolean)persister.hasBarrierReceived()).isFalse();
    }

    @Test
    void testLateBarrierOnStartedAndCancelledCheckpoint() throws Exception {
        this.testLateBarrier(true, true);
    }

    @Test
    void testLateBarrierOnCancelledCheckpoint() throws Exception {
        this.testLateBarrier(false, true);
    }

    @Test
    void testLateBarrierOnNotYetCancelledCheckpoint() throws Exception {
        this.testLateBarrier(false, false);
    }

    private void testLateBarrier(boolean startCheckpointOnLateBarrier, boolean cancelCheckpointBeforeLateBarrier) throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
        ChannelStatePersister persister = new ChannelStatePersister((ChannelStateWriter)channelStateWriter, channelInfo);
        long lateCheckpointId = 1L;
        long checkpointId = 2L;
        if (startCheckpointOnLateBarrier) {
            persister.startPersisting(lateCheckpointId, Collections.emptyList());
        }
        if (cancelCheckpointBeforeLateBarrier) {
            persister.stopPersisting(lateCheckpointId);
        }
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(lateCheckpointId));
        channelStateWriter.start(checkpointId, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        persister.startPersisting(checkpointId, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assertions.assertThat((boolean)persister.hasBarrierReceived()).isTrue();
        Assertions.assertThat((List)channelStateWriter.getAddedInput().get((Object)channelInfo)).hasSize(2);
    }

    @Test
    void testLateBarrierTriggeringCheckpoint() throws Exception {
        ChannelStatePersister persister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        long lateCheckpointId = 1L;
        long checkpointId = 2L;
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        Assertions.assertThatThrownBy(() -> persister.startPersisting(lateCheckpointId, Collections.emptyList())).isInstanceOf(CheckpointException.class);
    }

    private static Buffer barrier(long id) throws IOException {
        return EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(id, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), (boolean)true);
    }
}

