/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class NettyShuffleUtilsTest
extends TestLogger {
    @Test
    public void testComputeRequiredNetworkBuffers() throws Exception {
        int numBuffersPerChannel = 5;
        int numBuffersPerGate = 8;
        int sortShuffleMinParallelism = 8;
        int numSortShuffleMinBuffers = 12;
        int numChannels1 = 3;
        int numChannels2 = 4;
        IntermediateDataSetID ds1 = new IntermediateDataSetID();
        IntermediateDataSetID ds2 = new IntermediateDataSetID();
        IntermediateDataSetID ds3 = new IntermediateDataSetID();
        int numSubs1 = 5;
        int numSubs2 = 6;
        int numSubs3 = 10;
        ImmutableMap subpartitionNums = ImmutableMap.of((Object)ds1, (Object)numSubs1, (Object)ds2, (Object)numSubs2, (Object)ds3, (Object)numSubs3);
        ImmutableMap partitionTypes = ImmutableMap.of((Object)ds1, (Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)ds2, (Object)ResultPartitionType.BLOCKING, (Object)ds3, (Object)ResultPartitionType.BLOCKING);
        int numTotalBuffers = NettyShuffleUtils.computeNetworkBuffersForAnnouncing((int)numBuffersPerChannel, (int)numBuffersPerGate, (int)sortShuffleMinParallelism, (int)numSortShuffleMinBuffers, (int)(numChannels1 + numChannels2), (int)2, (Map)subpartitionNums, (Map)partitionTypes);
        NettyShuffleEnvironment sEnv = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(numTotalBuffers).setNetworkBuffersPerChannel(numBuffersPerChannel).setSortShuffleMinBuffers(numSortShuffleMinBuffers).setSortShuffleMinParallelism(sortShuffleMinParallelism).build();
        SingleInputGate inputGate1 = this.createInputGate(sEnv, ResultPartitionType.PIPELINED_BOUNDED, numChannels1);
        inputGate1.setup();
        SingleInputGate inputGate2 = this.createInputGate(sEnv, ResultPartitionType.BLOCKING, numChannels2);
        inputGate2.setup();
        ResultPartition resultPartition1 = this.createResultPartition(sEnv, ResultPartitionType.PIPELINED_BOUNDED, numSubs1);
        resultPartition1.setup();
        ResultPartition resultPartition2 = this.createResultPartition(sEnv, ResultPartitionType.BLOCKING, numSubs2);
        resultPartition2.setup();
        ResultPartition resultPartition3 = this.createResultPartition(sEnv, ResultPartitionType.BLOCKING, numSubs3);
        resultPartition3.setup();
        int expected = this.calculateBuffersConsumption(inputGate1) + this.calculateBuffersConsumption(inputGate2) + this.calculateBuffersConsumption(resultPartition1) + this.calculateBuffersConsumption(resultPartition2) + this.calculateBuffersConsumption(resultPartition3);
        Assert.assertEquals((long)expected, (long)numTotalBuffers);
        inputGate1.close();
        inputGate2.close();
        resultPartition1.close();
        resultPartition2.close();
        resultPartition3.close();
    }

    private SingleInputGate createInputGate(NettyShuffleEnvironment network, ResultPartitionType resultPartitionType, int numInputChannels) throws IOException {
        NettyShuffleDescriptor[] shuffleDescriptors = new NettyShuffleDescriptor[numInputChannels];
        for (int i = 0; i < numInputChannels; ++i) {
            shuffleDescriptors[i] = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        }
        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), resultPartitionType, 0, (ShuffleDescriptor[])shuffleDescriptors);
        ExecutionAttemptID consumerID = new ExecutionAttemptID();
        List inputGates = network.createInputGates(network.createShuffleIOOwnerContext("", consumerID, (MetricGroup)new UnregisteredMetricsGroup()), SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, Collections.singletonList(inputGateDeploymentDescriptor));
        return (SingleInputGate)inputGates.iterator().next();
    }

    private ResultPartition createResultPartition(NettyShuffleEnvironment network, ResultPartitionType resultPartitionType, int numSubpartitions) {
        NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate());
        PartitionDescriptor partitionDescriptor = new PartitionDescriptor(new IntermediateDataSetID(), 2, shuffleDescriptor.getResultPartitionID().getPartitionId(), resultPartitionType, numSubpartitions, 0);
        ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(partitionDescriptor, (ShuffleDescriptor)shuffleDescriptor, 1, true);
        ExecutionAttemptID consumerID = new ExecutionAttemptID();
        List resultPartitions = network.createResultPartitionWriters(network.createShuffleIOOwnerContext("", consumerID, (MetricGroup)new UnregisteredMetricsGroup()), Collections.singletonList(resultPartitionDeploymentDescriptor));
        return (ResultPartition)resultPartitions.iterator().next();
    }

    private int calculateBuffersConsumption(SingleInputGate inputGate) throws Exception {
        inputGate.setChannelStateWriter(ChannelStateWriter.NO_OP);
        inputGate.finishReadRecoveredState();
        while (!inputGate.getStateConsumedFuture().isDone()) {
            inputGate.pollNext();
        }
        inputGate.convertRecoveredInputChannels();
        int ret = 0;
        for (InputChannel ch : inputGate.getInputChannels().values()) {
            RemoteInputChannel rChannel = (RemoteInputChannel)ch;
            ret += rChannel.getNumberOfAvailableBuffers();
        }
        return ret += inputGate.getBufferPool().getMaxNumberOfMemorySegments();
    }

    private int calculateBuffersConsumption(ResultPartition partition) {
        if (partition.getPartitionType().isBlocking()) {
            return partition.getBufferPool().getNumberOfRequiredMemorySegments();
        }
        return partition.getBufferPool().getMaxNumberOfMemorySegments();
    }
}

