package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.class */
class InputBuffersMetricsTest {
    private CloseableRegistry closeableRegistry;

    InputBuffersMetricsTest() {
    }

    @BeforeEach
    void setup() {
        this.closeableRegistry = new CloseableRegistry();
    }

    @AfterEach
    void tearDown() throws IOException {
        this.closeableRegistry.close();
    }

    @Test
    void testCalculateTotalBuffersSize() throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).build();
        CloseableRegistry closeableRegistry = this.closeableRegistry;
        build.getClass();
        closeableRegistry.registerCloseable(build::close);
        SingleInputGate singleInputGate = (SingleInputGate) buildInputGate(build, 2, 0).f0;
        CloseableRegistry closeableRegistry2 = this.closeableRegistry;
        singleInputGate.getClass();
        closeableRegistry2.registerCloseable(singleInputGate::close);
        singleInputGate.setup();
        SingleInputGate[] singleInputGateArr = {singleInputGate};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(singleInputGateArr);
        ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(singleInputGateArr);
        CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, singleInputGateArr);
        CloseableRegistry closeableRegistry3 = this.closeableRegistry;
        build.getClass();
        closeableRegistry3.registerCloseable(build::close);
        CloseableRegistry closeableRegistry4 = this.closeableRegistry;
        singleInputGate.getClass();
        closeableRegistry4.registerCloseable(singleInputGate::close);
        Assertions.assertThat(floatingBuffersUsageGauge.calculateTotalBuffers(singleInputGate)).isEqualTo(8);
        Assertions.assertThat(exclusiveBuffersUsageGauge.calculateTotalBuffers(singleInputGate)).isEqualTo(2 * 2);
        Assertions.assertThat(creditBasedInputBuffersUsageGauge.calculateTotalBuffers(singleInputGate)).isEqualTo((2 * 2) + 8);
    }

    @Test
    void testExclusiveBuffersUsage() throws Exception {
        int i = 2 + 1;
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).build();
        CloseableRegistry closeableRegistry = this.closeableRegistry;
        build.getClass();
        closeableRegistry.registerCloseable(build::close);
        Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate = buildInputGate(build, 2, 0);
        Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate2 = buildInputGate(build, 1, 1);
        SingleInputGate singleInputGate = (SingleInputGate) buildInputGate.f0;
        SingleInputGate singleInputGate2 = (SingleInputGate) buildInputGate2.f0;
        CloseableRegistry closeableRegistry2 = this.closeableRegistry;
        singleInputGate.getClass();
        closeableRegistry2.registerCloseable(singleInputGate::close);
        CloseableRegistry closeableRegistry3 = this.closeableRegistry;
        singleInputGate2.getClass();
        closeableRegistry3.registerCloseable(singleInputGate2::close);
        singleInputGate.setup();
        singleInputGate2.setup();
        List list = (List) buildInputGate.f1;
        SingleInputGate[] singleInputGateArr = {(SingleInputGate) buildInputGate.f0, (SingleInputGate) buildInputGate2.f0};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(singleInputGateArr);
        ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(singleInputGateArr);
        CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, singleInputGateArr);
        Assertions.assertThat(exclusiveBuffersUsageGauge.getValue()).isEqualTo(0.0f, Assertions.offset(Float.valueOf(0.0f)));
        Assertions.assertThat(creditBasedInputBuffersUsageGauge.getValue()).isEqualTo(0.0f, Assertions.offset(Float.valueOf(0.0f)));
        int length = (8 * singleInputGateArr.length) + (2 * i);
        int i2 = 1;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            drainAndValidate(2, 2 * i3, (RemoteInputChannel) it.next(), length, 2 * i, exclusiveBuffersUsageGauge, creditBasedInputBuffersUsageGauge, singleInputGate);
        }
    }

    @Test
    void testFloatingBuffersUsage() throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).build();
        CloseableRegistry closeableRegistry = this.closeableRegistry;
        build.getClass();
        closeableRegistry.registerCloseable(build::close);
        Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate = buildInputGate(build, 2, 0);
        SingleInputGate singleInputGate = (SingleInputGate) buildInputGate(build, 1, 1).f0;
        SingleInputGate singleInputGate2 = (SingleInputGate) buildInputGate.f0;
        CloseableRegistry closeableRegistry2 = this.closeableRegistry;
        singleInputGate2.getClass();
        closeableRegistry2.registerCloseable(singleInputGate2::close);
        CloseableRegistry closeableRegistry3 = this.closeableRegistry;
        singleInputGate.getClass();
        closeableRegistry3.registerCloseable(singleInputGate::close);
        singleInputGate2.setup();
        singleInputGate.setup();
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) ((List) buildInputGate.f1).get(0);
        SingleInputGate[] singleInputGateArr = {(SingleInputGate) buildInputGate.f0, singleInputGate};
        FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(singleInputGateArr);
        CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge = new CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, new ExclusiveBuffersUsageGauge(singleInputGateArr), singleInputGateArr);
        Assertions.assertThat(floatingBuffersUsageGauge.getValue()).isEqualTo(0.0f, Assertions.offset(Float.valueOf(0.0f)));
        Assertions.assertThat(creditBasedInputBuffersUsageGauge.getValue()).isEqualTo(0.0f, Assertions.offset(Float.valueOf(0.0f)));
        drainBuffer(2, remoteInputChannel);
        int length = (8 * singleInputGateArr.length) + (2 * (2 + 1));
        remoteInputChannel.requestSubpartitions();
        int i = 2 + 3;
        remoteInputChannel.onSenderBacklog(3);
        Assertions.assertThat(remoteInputChannel.unsynchronizedGetFloatingBuffersAvailable()).isEqualTo(i);
        drainBuffer(i, remoteInputChannel);
        Assertions.assertThat(remoteInputChannel.unsynchronizedGetFloatingBuffersAvailable()).isZero();
        Assertions.assertThat(creditBasedInputBuffersUsageGauge.getValue().floatValue()).isEqualTo((2 + i) / length, Assertions.offset(Double.valueOf(1.0E-4d)));
    }

    private void drainAndValidate(int i, int i2, RemoteInputChannel remoteInputChannel, int i3, int i4, ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge, CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge, SingleInputGate singleInputGate) throws IOException {
        drainBuffer(i, remoteInputChannel);
        Assertions.assertThat(exclusiveBuffersUsageGauge.calculateUsedBuffers(singleInputGate)).isEqualTo(i2);
        Assertions.assertThat(exclusiveBuffersUsageGauge.getValue().floatValue()).isEqualTo(i2 / i4, Assertions.offset(Double.valueOf(1.0E-4d)));
        Assertions.assertThat(creditBasedInputBuffersUsageGauge.getValue().floatValue()).isEqualTo(i2 / i3, Assertions.offset(Double.valueOf(1.0E-4d)));
    }

    private void drainBuffer(int i, RemoteInputChannel remoteInputChannel) throws IOException {
        Buffer requestBuffer;
        for (int i2 = 0; i2 < i && (requestBuffer = remoteInputChannel.requestBuffer()) != null; i2++) {
            CloseableRegistry closeableRegistry = this.closeableRegistry;
            requestBuffer.getClass();
            closeableRegistry.registerCloseable(requestBuffer::recycleBuffer);
        }
    }

    private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(NettyShuffleEnvironment nettyShuffleEnvironment, int i, int i2) throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(i + i2).setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).setupBufferPoolFactory(nettyShuffleEnvironment).build();
        InputChannel[] inputChannelArr = new InputChannel[i + i2];
        Tuple2<SingleInputGate, List<RemoteInputChannel>> of = Tuple2.of(build, new ArrayList());
        int i3 = 0;
        for (int i4 = 0; i4 < i; i4++) {
            ResultPartition createPartition = PartitionTestUtils.createPartition(nettyShuffleEnvironment, ResultPartitionType.PIPELINED_BOUNDED, 1);
            CloseableRegistry closeableRegistry = this.closeableRegistry;
            createPartition.getClass();
            closeableRegistry.registerCloseable(createPartition::close);
            createPartition.setup();
            RemoteInputChannel buildRemoteChannel = buildRemoteChannel(i3, build, nettyShuffleEnvironment, createPartition);
            inputChannelArr[i4] = buildRemoteChannel;
            ((List) of.f1).add(buildRemoteChannel);
            i3++;
        }
        for (int i5 = 0; i5 < i2; i5++) {
            ResultPartition createPartition2 = PartitionTestUtils.createPartition(nettyShuffleEnvironment, ResultPartitionType.PIPELINED_BOUNDED, 1);
            CloseableRegistry closeableRegistry2 = this.closeableRegistry;
            createPartition2.getClass();
            closeableRegistry2.registerCloseable(createPartition2::close);
            createPartition2.setup();
            inputChannelArr[i + i5] = buildLocalChannel(i3, build, nettyShuffleEnvironment, createPartition2);
        }
        build.setInputChannels(inputChannelArr);
        return of;
    }

    private RemoteInputChannel buildRemoteChannel(int i, SingleInputGate singleInputGate, NettyShuffleEnvironment nettyShuffleEnvironment, ResultPartition resultPartition) {
        return new InputChannelBuilder().setPartitionId(resultPartition.getPartitionId()).setChannelIndex(i).setupFromNettyShuffleEnvironment(nettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildRemoteChannel(singleInputGate);
    }

    private LocalInputChannel buildLocalChannel(int i, SingleInputGate singleInputGate, NettyShuffleEnvironment nettyShuffleEnvironment, ResultPartition resultPartition) {
        return new InputChannelBuilder().setPartitionId(resultPartition.getPartitionId()).setChannelIndex(i).setupFromNettyShuffleEnvironment(nettyShuffleEnvironment).setConnectionManager(new TestingConnectionManager()).buildLocalChannel(singleInputGate);
    }
}
