/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class InputBuffersMetricsTest
extends TestLogger {
    private CloseableRegistry closeableRegistry;

    @Before
    public void setup() {
        this.closeableRegistry = new CloseableRegistry();
    }

    @After
    public void tearDown() throws IOException {
        this.closeableRegistry.close();
    }

    @Test
    public void testCalculateTotalBuffersSize() throws Exception {
        int numberOfRemoteChannels = 2;
        int numberOfLocalChannels = 0;
        int numberOfBufferPerChannel = 2;
        int numberOfBuffersPerGate = 8;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(numberOfBufferPerChannel).setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate).build();
        this.closeableRegistry.registerCloseable(() -> ((NettyShuffleEnvironment)network).close());
        SingleInputGate inputGate1 = (SingleInputGate)this.buildInputGate((NettyShuffleEnvironment)network, (int)numberOfRemoteChannels, (int)numberOfLocalChannels).f0;
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate1).close());
        inputGate1.setup();
        SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
        ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
        CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, inputGates);
        this.closeableRegistry.registerCloseable(() -> ((NettyShuffleEnvironment)network).close());
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate1).close());
        Assert.assertEquals((long)numberOfBuffersPerGate, (long)floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
        Assert.assertEquals((long)(numberOfRemoteChannels * numberOfBufferPerChannel), (long)exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
        Assert.assertEquals((long)(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate), (long)inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
    }

    @Test
    public void testExclusiveBuffersUsage() throws Exception {
        int numberOfRemoteChannelsGate1 = 2;
        int numberOfLocalChannelsGate1 = 0;
        int numberOfRemoteChannelsGate2 = 1;
        int numberOfLocalChannelsGate2 = 1;
        int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(buffersPerChannel).setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate).build();
        this.closeableRegistry.registerCloseable(() -> ((NettyShuffleEnvironment)network).close());
        Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = this.buildInputGate(network, numberOfRemoteChannelsGate1, numberOfLocalChannelsGate1);
        Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple2 = this.buildInputGate(network, numberOfRemoteChannelsGate2, numberOfLocalChannelsGate2);
        SingleInputGate inputGate1 = (SingleInputGate)tuple1.f0;
        SingleInputGate inputGate2 = (SingleInputGate)tuple2.f0;
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate1).close());
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate2).close());
        inputGate1.setup();
        inputGate2.setup();
        List remoteInputChannels = (List)tuple1.f1;
        SingleInputGate[] inputGates = new SingleInputGate[]{(SingleInputGate)tuple1.f0, (SingleInputGate)tuple2.f0};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
        ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
        CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, inputGates);
        Assert.assertEquals((double)0.0, (double)exclusiveBuffersUsageGauge.getValue().floatValue(), (double)0.0);
        Assert.assertEquals((double)0.0, (double)inputBuffersUsageGauge.getValue().floatValue(), (double)0.0);
        int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
        int channelIndex = 1;
        for (RemoteInputChannel channel : remoteInputChannels) {
            this.drainAndValidate(buffersPerChannel, buffersPerChannel * channelIndex++, channel, totalBuffers, buffersPerChannel * totalNumberOfRemoteChannels, exclusiveBuffersUsageGauge, inputBuffersUsageGauge, inputGate1);
        }
    }

    @Test
    public void testFloatingBuffersUsage() throws Exception {
        int numberOfRemoteChannelsGate1 = 2;
        int numberOfLocalChannelsGate1 = 0;
        int numberOfRemoteChannelsGate2 = 1;
        int numberOfLocalChannelsGate2 = 1;
        int totalNumberOfRemoteChannels = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2;
        int buffersPerChannel = 2;
        int extraNetworkBuffersPerGate = 8;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(buffersPerChannel).setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate).build();
        this.closeableRegistry.registerCloseable(() -> ((NettyShuffleEnvironment)network).close());
        Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = this.buildInputGate(network, numberOfRemoteChannelsGate1, numberOfLocalChannelsGate1);
        SingleInputGate inputGate2 = (SingleInputGate)this.buildInputGate((NettyShuffleEnvironment)network, (int)numberOfRemoteChannelsGate2, (int)numberOfLocalChannelsGate2).f0;
        SingleInputGate inputGate1 = (SingleInputGate)tuple1.f0;
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate1).close());
        this.closeableRegistry.registerCloseable(() -> ((SingleInputGate)inputGate2).close());
        inputGate1.setup();
        inputGate2.setup();
        RemoteInputChannel remoteInputChannel1 = (RemoteInputChannel)((List)tuple1.f1).get(0);
        SingleInputGate[] inputGates = new SingleInputGate[]{(SingleInputGate)tuple1.f0, inputGate2};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
        ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates);
        CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, inputGates);
        Assert.assertEquals((double)0.0, (double)floatingBuffersUsageGauge.getValue().floatValue(), (double)0.0);
        Assert.assertEquals((double)0.0, (double)inputBuffersUsageGauge.getValue().floatValue(), (double)0.0);
        this.drainBuffer(buffersPerChannel, remoteInputChannel1);
        int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
        remoteInputChannel1.requestSubpartition();
        int backlog = 3;
        int totalRequestedBuffers = buffersPerChannel + backlog;
        remoteInputChannel1.onSenderBacklog(backlog);
        Assert.assertEquals((long)totalRequestedBuffers, (long)remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
        this.drainBuffer(totalRequestedBuffers, remoteInputChannel1);
        Assert.assertEquals((long)0L, (long)remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
        Assert.assertEquals((double)((double)(buffersPerChannel + totalRequestedBuffers) / (double)totalBuffers), (double)inputBuffersUsageGauge.getValue().floatValue(), (double)1.0E-4);
    }

    private void drainAndValidate(int numBuffersToRequest, int totalRequestedBuffers, RemoteInputChannel channel, int totalBuffers, int totalExclusiveBuffers, ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge, CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge, SingleInputGate inputGate) throws IOException {
        this.drainBuffer(numBuffersToRequest, channel);
        Assert.assertEquals((long)totalRequestedBuffers, (long)exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
        Assert.assertEquals((double)((double)totalRequestedBuffers / (double)totalExclusiveBuffers), (double)exclusiveBuffersUsageGauge.getValue().floatValue(), (double)1.0E-4);
        Assert.assertEquals((double)((double)totalRequestedBuffers / (double)totalBuffers), (double)inputBuffersUsageGauge.getValue().floatValue(), (double)1.0E-4);
    }

    private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException {
        Buffer buffer;
        for (int i = 0; i < boundary && (buffer = channel.requestBuffer()) != null; ++i) {
            this.closeableRegistry.registerCloseable(() -> ((Buffer)buffer).recycleBuffer());
        }
    }

    private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(NettyShuffleEnvironment network, int numberOfRemoteChannels, int numberOfLocalChannels) throws Exception {
        ResultPartition partition;
        int i;
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(numberOfRemoteChannels + numberOfLocalChannels).setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).setupBufferPoolFactory(network).build();
        InputChannel[] inputChannels = new InputChannel[numberOfRemoteChannels + numberOfLocalChannels];
        Tuple2 res = Tuple2.of((Object)inputGate, new ArrayList());
        int channelIdx = 0;
        for (i = 0; i < numberOfRemoteChannels; ++i) {
            partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
            this.closeableRegistry.registerCloseable(() -> ((ResultPartition)partition).close());
            partition.setup();
            RemoteInputChannel remoteChannel = this.buildRemoteChannel(channelIdx, inputGate, network, partition);
            inputChannels[i] = remoteChannel;
            ((List)res.f1).add(remoteChannel);
            ++channelIdx;
        }
        for (i = 0; i < numberOfLocalChannels; ++i) {
            partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
            this.closeableRegistry.registerCloseable(() -> ((ResultPartition)partition).close());
            partition.setup();
            inputChannels[numberOfRemoteChannels + i] = this.buildLocalChannel(channelIdx, inputGate, network, partition);
        }
        inputGate.setInputChannels(inputChannels);
        return res;
    }

    private RemoteInputChannel buildRemoteChannel(int channelIndex, SingleInputGate inputGate, NettyShuffleEnvironment network, ResultPartition partition) {
        return new InputChannelBuilder().setPartitionId(partition.getPartitionId()).setChannelIndex(channelIndex).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(inputGate);
    }

    private LocalInputChannel buildLocalChannel(int channelIndex, SingleInputGate inputGate, NettyShuffleEnvironment network, ResultPartition partition) {
        return new InputChannelBuilder().setPartitionId(partition.getPartitionId()).setChannelIndex(channelIndex).setupFromNettyShuffleEnvironment(network).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(inputGate);
    }
}

