/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class NettyPartitionRequestClientTest {
    @Parameter
    private boolean connectionReuseEnabled;

    NettyPartitionRequestClientTest() {
    }

    @Parameters(name="connection reuse enabled = {0}")
    private static Object[] parameters() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testPartitionRequestClientReuse() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = NettyPartitionRequestClientTest.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, true);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            client.close(inputChannel);
            Assertions.assertThat((boolean)client.canBeDisposed()).isFalse();
            handler.notifyAllChannelsOfErrorAndClose((Throwable)new RuntimeException());
            Assertions.assertThat((boolean)client.canBeDisposed()).isTrue();
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRetriggerPartitionRequest() throws Exception {
        long deadline = System.currentTimeMillis() + 30000L;
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = NettyPartitionRequestClientTest.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setConnectionManager(InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient((PartitionRequestClient)client)).setPartitionRequestListenerTimeout(1).setMaxBackoff(2).buildRemoteChannel(inputGate);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Assertions.assertThat((boolean)channel.isWritable()).isTrue();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)((NettyMessage.PartitionRequest)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId(), inputChannel.getChannelInfo());
            NettyPartitionRequestClientTest.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)((NettyMessage.PartitionRequest)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId(), inputChannel.getChannelInfo());
            NettyPartitionRequestClientTest.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)((NettyMessage.PartitionRequest)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testDoublePartitionRequest() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = NettyPartitionRequestClientTest.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Assertions.assertThat((boolean)channel.isWritable()).isTrue();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            Assertions.assertThat((Comparable)((NettyMessage.PartitionRequest)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((int)((NettyMessage.PartitionRequest)readFromOutbound).credit).isEqualTo(2);
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testResumeConsumption() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = NettyPartitionRequestClientTest.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            inputChannel.resumeConsumption();
            channel.runPendingTasks();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.ResumeConsumption.class);
            Assertions.assertThat((Comparable)((NettyMessage.ResumeConsumption)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testAcknowledgeAllRecordsProcessed() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = NettyPartitionRequestClientTest.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler, this.connectionReuseEnabled);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            inputChannel.acknowledgeAllRecordsProcessed();
            channel.runPendingTasks();
            Object readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.PartitionRequest.class);
            readFromOutbound = channel.readOutbound();
            Assertions.assertThat((Object)readFromOutbound).isInstanceOf(NettyMessage.AckAllUserRecordsProcessed.class);
            Assertions.assertThat((Comparable)((NettyMessage.AckAllUserRecordsProcessed)readFromOutbound).receiverId).isEqualTo((Object)inputChannel.getInputChannelId());
            Assertions.assertThat((Object)channel.readOutbound()).isNull();
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private static NettyPartitionRequestClient createPartitionRequestClient(Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled) throws Exception {
        ConnectionID connectionID = new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1, new Configuration());
        NettyClient nettyClient = new NettyClient(config);
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
        return new NettyPartitionRequestClient(tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
    }

    private static void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) throws InterruptedException {
        while (channel.runScheduledPendingTasks() != -1L && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
    }
}

