/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.lang.reflect.Field;
import java.net.InetAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;

public class NettyConnectionManagerTest {
    @Test
    public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
        int numberOfSlots = 2;
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), NetUtils.getAvailablePort(), 1024, numberOfSlots, new Configuration());
        NettyConnectionManager connectionManager = this.createNettyConnectionManager(config);
        connectionManager.start();
        Assert.assertEquals((long)numberOfSlots, (long)connectionManager.getBufferPool().getNumberOfArenas());
        Bootstrap boostrap = connectionManager.getClient().getBootstrap();
        EventLoopGroup group = boostrap.group();
        Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        Object[] eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfSlots, (long)eventExecutors.length);
        ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.group();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfSlots, (long)eventExecutors.length);
        bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.childGroup();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfSlots, (long)eventExecutors.length);
    }

    @Test
    public void testManualConfiguration() throws Exception {
        int numberOfArenas = 1;
        int numberOfClientThreads = 3;
        int numberOfServerThreads = 4;
        Configuration flinkConfig = new Configuration();
        flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, numberOfArenas);
        flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_CLIENT, 3);
        flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), NetUtils.getAvailablePort(), 1024, 1337, flinkConfig);
        NettyConnectionManager connectionManager = this.createNettyConnectionManager(config);
        connectionManager.start();
        Assert.assertEquals((long)numberOfArenas, (long)connectionManager.getBufferPool().getNumberOfArenas());
        Bootstrap boostrap = connectionManager.getClient().getBootstrap();
        EventLoopGroup group = boostrap.group();
        Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        Object[] eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfClientThreads, (long)eventExecutors.length);
        ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.group();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfServerThreads, (long)eventExecutors.length);
        bootstrap = connectionManager.getServer().getBootstrap();
        group = bootstrap.childGroup();
        f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children");
        f.setAccessible(true);
        eventExecutors = (Object[])f.get(group);
        Assert.assertEquals((long)numberOfServerThreads, (long)eventExecutors.length);
    }

    private NettyConnectionManager createNettyConnectionManager(NettyConfig config) {
        return new NettyConnectionManager((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), config);
    }
}

