package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest.class */
class PartitionRequestServerHandlerTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandlerTest$TestViewReader.class */
    private static class TestViewReader extends CreditBasedSequenceNumberingViewReader {
        private boolean consumptionResumed;
        private int bufferSize;

        TestViewReader(InputChannelID inputChannelID, int i, PartitionRequestQueue partitionRequestQueue) {
            super(inputChannelID, i, partitionRequestQueue);
            this.consumptionResumed = false;
            this.bufferSize = -1;
        }

        public void resumeConsumption() {
            this.consumptionResumed = true;
        }

        public void notifyNewBufferSize(int i) {
            this.bufferSize = i;
        }
    }

    PartitionRequestServerHandlerTest() {
    }

    @Test
    void testResumeConsumption() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new PartitionRequestServerHandler(new ResultPartitionManager(), new TaskEventDispatcher(), partitionRequestQueue)});
        partitionRequestQueue.notifyReaderCreated(testViewReader);
        embeddedChannel.writeInbound(new Object[]{new NettyMessage.ResumeConsumption(inputChannelID)});
        embeddedChannel.runPendingTasks();
        Assertions.assertThat(testViewReader.consumptionResumed).isTrue();
    }

    @Test
    void testAcknowledgeAllRecordsProcessed() throws IOException {
        InputChannelID inputChannelID = new InputChannelID();
        ResultPartition createPartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ChannelHandler partitionRequestQueue = new PartitionRequestQueue();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new PartitionRequestServerHandler(new ResultPartitionManager(), new TaskEventDispatcher(), partitionRequestQueue), partitionRequestQueue});
        CreditBasedSequenceNumberingViewReader creditBasedSequenceNumberingViewReader = new CreditBasedSequenceNumberingViewReader(inputChannelID, 2, partitionRequestQueue);
        creditBasedSequenceNumberingViewReader.notifySubpartitionsCreated(createPartition, new ResultSubpartitionIndexSet(0));
        partitionRequestQueue.notifyReaderCreated(creditBasedSequenceNumberingViewReader);
        createPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allDataProcessedFuture = createPartition.getAllDataProcessedFuture();
        Assertions.assertThat(allDataProcessedFuture).isNotDone();
        embeddedChannel.writeInbound(new Object[]{new NettyMessage.AckAllUserRecordsProcessed(inputChannelID)});
        embeddedChannel.runPendingTasks();
        Assertions.assertThat(allDataProcessedFuture).isDone().isNotCompletedExceptionally();
    }

    @Test
    public void testNewBufferSize() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new PartitionRequestServerHandler(new ResultPartitionManager(), new TaskEventDispatcher(), partitionRequestQueue)});
        partitionRequestQueue.notifyReaderCreated(testViewReader);
        embeddedChannel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        embeddedChannel.runPendingTasks();
        Assertions.assertThat(testViewReader.bufferSize).isEqualTo(666);
    }

    @Test
    void testReceivingNewBufferSizeBeforeReaderIsCreated() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new PartitionRequestServerHandler(new ResultPartitionManager(), new TaskEventDispatcher(), partitionRequestQueue)});
        embeddedChannel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        embeddedChannel.runPendingTasks();
        Assertions.assertThat(embeddedChannel.outboundMessages()).withFailMessage(embeddedChannel.outboundMessages().toString(), new Object[0]).isEmpty();
        Assertions.assertThat(testViewReader.bufferSize).isEqualTo(-1);
    }
}
