package org.apache.flink.runtime.io.network.netty;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.TestingResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest.class */
class PartitionRequestRegistrationTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest$NoOpTaskEventPublisher.class */
    private static class NoOpTaskEventPublisher implements TaskEventPublisher {
        private NoOpTaskEventPublisher() {
        }

        public boolean publish(ResultPartitionID resultPartitionID, TaskEvent taskEvent) {
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest$TestRemoteInputChannelForPartitionNotFound.class */
    private static class TestRemoteInputChannelForPartitionNotFound extends RemoteInputChannel {
        private final CountDownLatch latch;

        TestRemoteInputChannelForPartitionNotFound(CountDownLatch countDownLatch) {
            super(new SingleInputGateBuilder().setNumberOfChannels(1).build(), 0, new ResultPartitionID(), new ResultSubpartitionIndexSet(0), InputChannelBuilder.STUB_CONNECTION_ID, new TestingConnectionManager(), 0, 100, 10000, 2, new SimpleCounter(), new SimpleCounter(), ChannelStateWriter.NO_OP);
            this.latch = countDownLatch;
        }

        public void onFailedPartitionRequest() {
            this.latch.countDown();
        }
    }

    PartitionRequestRegistrationTest() {
    }

    @Test
    void testRegisterResultPartitionBeforeRequest() throws Exception {
        TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(16);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CancelPartitionRequestTest.InfiniteSubpartitionView infiniteSubpartitionView = new CancelPartitionRequestTest.InfiniteSubpartitionView(testPooledBufferProvider, countDownLatch);
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TestingResultPartition build = TestingResultPartition.newBuilder().setResultPartitionManager(resultPartitionManager).setCreateSubpartitionViewFunction((i, bufferAvailabilityListener) -> {
            return infiniteSubpartitionView;
        }).build();
        resultPartitionManager.registerResultPartition(build);
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            nettyServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(resultPartitionManager, new NoOpTaskEventPublisher()));
            NettyTestUtil.connect(nettyServerAndClient).writeAndFlush(new NettyMessage.PartitionRequest(build.getPartitionId(), new ResultSubpartitionIndexSet(0), new InputChannelID(), Integer.MAX_VALUE)).await();
            if (!countDownLatch.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition.");
            }
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            NettyTestUtil.shutdown(nettyServerAndClient);
            throw th;
        }
    }

    @Test
    void testRegisterResultPartitionAfterRequest() throws Exception {
        TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(16);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CancelPartitionRequestTest.InfiniteSubpartitionView infiniteSubpartitionView = new CancelPartitionRequestTest.InfiniteSubpartitionView(testPooledBufferProvider, countDownLatch);
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TestingResultPartition build = TestingResultPartition.newBuilder().setResultPartitionManager(resultPartitionManager).setCreateSubpartitionViewFunction((i, bufferAvailabilityListener) -> {
            return infiniteSubpartitionView;
        }).build();
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            nettyServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(resultPartitionManager, new NoOpTaskEventPublisher()));
            NettyTestUtil.connect(nettyServerAndClient).writeAndFlush(new NettyMessage.PartitionRequest(build.getPartitionId(), new ResultSubpartitionIndexSet(0), new InputChannelID(), Integer.MAX_VALUE)).await();
            resultPartitionManager.registerResultPartition(build);
            if (!countDownLatch.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition.");
            }
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            NettyTestUtil.shutdown(nettyServerAndClient);
            throw th;
        }
    }

    @Test
    void testPartitionRequestNotifierTimeout() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            nettyServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(new ResultPartitionProvider() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestRegistrationTest.1
                public ResultSubpartitionView createSubpartitionView(ResultPartitionID resultPartitionID2, ResultSubpartitionIndexSet resultSubpartitionIndexSet, BufferAvailabilityListener bufferAvailabilityListener) {
                    return null;
                }

                public Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(ResultPartitionID resultPartitionID2, ResultSubpartitionIndexSet resultSubpartitionIndexSet, BufferAvailabilityListener bufferAvailabilityListener, PartitionRequestListener partitionRequestListener) {
                    partitionRequestListener.notifyPartitionCreatedTimeout();
                    return Optional.empty();
                }

                public void releasePartitionRequestListener(PartitionRequestListener partitionRequestListener) {
                }
            }, new NoOpTaskEventPublisher()));
            Channel connect = NettyTestUtil.connect(nettyServerAndClient);
            NetworkClientHandler networkClientHandler = connect.pipeline().get(NetworkClientHandler.class);
            TestRemoteInputChannelForPartitionNotFound testRemoteInputChannelForPartitionNotFound = new TestRemoteInputChannelForPartitionNotFound(countDownLatch);
            networkClientHandler.addInputChannel(testRemoteInputChannelForPartitionNotFound);
            connect.writeAndFlush(new NettyMessage.PartitionRequest(resultPartitionID, new ResultSubpartitionIndexSet(0), testRemoteInputChannelForPartitionNotFound.getInputChannelId(), Integer.MAX_VALUE)).await();
            if (!countDownLatch.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition.");
            }
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            NettyTestUtil.shutdown(nettyServerAndClient);
            throw th;
        }
    }
}
