/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class TieredStorageMemoryManagerImplTest {
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
    private NetworkBufferPool globalPool;
    private List<BufferBuilder> requestedBuffers;
    private CompletableFuture<Void> hasReclaimBufferFinished;
    private int reclaimBufferCounter;

    TieredStorageMemoryManagerImplTest() {
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.requestedBuffers = new ArrayList<BufferBuilder>();
        this.hasReclaimBufferFinished = new CompletableFuture();
        this.reclaimBufferCounter = 0;
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @Test
    void testRequestAndRecycleBuffers() throws IOException {
        int numBuffers = 1;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers, numBuffers);
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(bufferPool, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
        BufferBuilder builder = storageMemoryManager.requestBufferBlocking((Object)this);
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
        TieredStorageMemoryManagerImplTest.recycleBufferBuilder(builder);
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
        storageMemoryManager.release();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
    }

    @Test
    void testRecycleBuffersAfterPoolSizeDecreased() throws IOException {
        int i;
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, 1, numBuffers);
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(bufferPool, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        for (i = 0; i < numBuffers; ++i) {
            BufferBuilder builder = storageMemoryManager.requestBufferBlocking((Object)this);
            this.requestedBuffers.add(builder);
        }
        bufferPool.setNumBuffers(numBuffers / 2);
        for (i = 0; i < numBuffers; ++i) {
            TieredStorageMemoryManagerImplTest.recycleBufferBuilder(this.requestedBuffers.get(i));
            Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(Math.max(numBuffers / 2, numBuffers - (i + 1)));
        }
    }

    @Test
    void testRecycleBuffersAfterReleased() throws IOException {
        int i;
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers, numBuffers);
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(bufferPool, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        for (i = 0; i < numBuffers; ++i) {
            BufferBuilder builder = storageMemoryManager.requestBufferBlocking((Object)this);
            this.requestedBuffers.add(builder);
        }
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffers);
        storageMemoryManager.release();
        for (i = 0; i < numBuffers; ++i) {
            TieredStorageMemoryManagerImplTest.recycleBufferBuilder(this.requestedBuffers.get(i));
        }
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
    }

    @Test
    void testGetMaxNonReclaimableBuffers() throws IOException {
        int numBuffers = 10;
        int numExclusive = 5;
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(numBuffers, Collections.singletonList(new TieredStorageMemorySpec((Object)this, numExclusive)));
        ArrayList<BufferBuilder> requestedBuffers = new ArrayList<BufferBuilder>();
        for (int i = 1; i <= numBuffers; ++i) {
            requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
            Assertions.assertThat((int)storageMemoryManager.getMaxNonReclaimableBuffers((Object)this)).isEqualTo(numBuffers);
            int numExpectedAvailable = numBuffers - i;
            Assertions.assertThat((int)(storageMemoryManager.getMaxNonReclaimableBuffers((Object)this) - storageMemoryManager.numOwnerRequestedBuffer((Object)this))).isEqualTo(numExpectedAvailable);
        }
        requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        storageMemoryManager.release();
    }

    @Test
    void testNumMaxNonReclaimableWhenOtherUseLessThanGuaranteed() throws IOException {
        int numBuffers = 10;
        int numExclusive = 4;
        ArrayList<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<TieredStorageMemorySpec>();
        Object otherUser = new Object();
        storageMemorySpecs.add(new TieredStorageMemorySpec((Object)this, 0));
        storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser, numExclusive));
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(numBuffers, storageMemorySpecs);
        ArrayList<BufferBuilder> requestedBuffers = new ArrayList<BufferBuilder>();
        Assertions.assertThat((int)storageMemoryManager.getMaxNonReclaimableBuffers((Object)this)).isEqualTo(numBuffers - numExclusive);
        for (int i = 1; i <= numBuffers; ++i) {
            requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
            Assertions.assertThat((int)storageMemoryManager.getMaxNonReclaimableBuffers((Object)this)).isEqualTo(numBuffers - numExclusive);
            int numExpectedAvailable = numBuffers - i - numExclusive;
            Assertions.assertThat((int)(storageMemoryManager.getMaxNonReclaimableBuffers((Object)this) - storageMemoryManager.numOwnerRequestedBuffer((Object)this))).isEqualTo(numExpectedAvailable);
        }
        requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        storageMemoryManager.release();
    }

    @Test
    void testNumMaxNonReclaimableWhenOtherUseMoreThanGuaranteed() throws IOException {
        int i;
        int numBuffers = 10;
        int numExclusive = 4;
        ArrayList<TieredStorageMemorySpec> storageMemorySpecs = new ArrayList<TieredStorageMemorySpec>();
        Object otherUser = new Object();
        storageMemorySpecs.add(new TieredStorageMemorySpec((Object)this, 0));
        storageMemorySpecs.add(new TieredStorageMemorySpec(otherUser, numExclusive));
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(numBuffers, storageMemorySpecs);
        int numRequestedByOtherUser = numExclusive + 1;
        for (i = 0; i < numRequestedByOtherUser; ++i) {
            this.requestedBuffers.add(storageMemoryManager.requestBufferBlocking(otherUser));
        }
        Assertions.assertThat((int)storageMemoryManager.getMaxNonReclaimableBuffers((Object)this)).isEqualTo(numBuffers - numRequestedByOtherUser);
        for (i = 1; i <= numBuffers - numRequestedByOtherUser; ++i) {
            this.requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
            Assertions.assertThat((int)storageMemoryManager.getMaxNonReclaimableBuffers((Object)this)).isEqualTo(numBuffers - numRequestedByOtherUser);
            int numExpectedAvailable = numBuffers - i - numRequestedByOtherUser;
            Assertions.assertThat((int)(storageMemoryManager.getMaxNonReclaimableBuffers((Object)this) - storageMemoryManager.numOwnerRequestedBuffer((Object)this))).isEqualTo(numExpectedAvailable);
        }
        Assertions.assertThat((int)storageMemoryManager.numOwnerRequestedBuffer((Object)this)).isEqualTo(numBuffers - numRequestedByOtherUser);
        Assertions.assertThat((int)storageMemoryManager.numOwnerRequestedBuffer(otherUser)).isEqualTo(numRequestedByOtherUser);
        this.requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        storageMemoryManager.release();
    }

    @Test
    @Timeout(value=60L)
    void testTriggerReclaimBuffers() throws IOException {
        int numBuffers = 5;
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(numBuffers, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        storageMemoryManager.listenBufferReclaimRequest(this::onBufferReclaimRequest);
        int numBuffersBeforeTriggerReclaim = (int)((float)numBuffers * 0.6f);
        for (int i = 0; i < numBuffersBeforeTriggerReclaim; ++i) {
            this.requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
        }
        Assertions.assertThat((int)this.reclaimBufferCounter).isEqualTo(0);
        Assertions.assertThat((int)this.requestedBuffers.size()).isEqualTo(numBuffersBeforeTriggerReclaim);
        this.requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
        FlinkAssertions.assertThatFuture(this.hasReclaimBufferFinished).eventuallySucceeds();
        Assertions.assertThat((int)this.reclaimBufferCounter).isEqualTo(1);
        Assertions.assertThat((int)this.requestedBuffers.size()).isEqualTo(1);
        this.recycleRequestedBuffers();
        storageMemoryManager.release();
    }

    @Test
    void testTransferBufferOwnership() throws IOException {
        TieredStorageMemoryManagerImpl memoryManager = this.createStorageMemoryManager(1, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        BufferBuilder bufferBuilder = memoryManager.requestBufferBlocking((Object)this);
        Assertions.assertThat((int)memoryManager.numOwnerRequestedBuffer((Object)this)).isEqualTo(1);
        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning();
        Buffer buffer = bufferConsumer.build();
        bufferBuilder.close();
        bufferConsumer.close();
        Object newOwner = new Object();
        memoryManager.transferBufferOwnership((Object)this, newOwner, buffer);
        Assertions.assertThat((int)memoryManager.numOwnerRequestedBuffer((Object)this)).isEqualTo(0);
        Assertions.assertThat((int)memoryManager.numOwnerRequestedBuffer(newOwner)).isEqualTo(1);
        buffer.recycleBuffer();
        Assertions.assertThat((int)memoryManager.numOwnerRequestedBuffer(newOwner)).isEqualTo(0);
    }

    @Test
    void testCanNotTransferOwnershipForEvent() throws IOException {
        TieredStorageMemoryManagerImpl memoryManager = this.createStorageMemoryManager(1, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        BufferConsumer bufferConsumer = BufferBuilderTestUtils.createEventBufferConsumer(1, Buffer.DataType.EVENT_BUFFER);
        Buffer buffer = bufferConsumer.build();
        bufferConsumer.close();
        AssertionsForClassTypes.assertThatThrownBy(() -> memoryManager.transferBufferOwnership((Object)this, new Object(), buffer)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testEnsureCapacity() throws IOException {
        int numBuffers = 5;
        int guaranteedReclaimableBuffers = 3;
        BufferPool bufferPool = this.globalPool.createBufferPool(5, 5, 5);
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(bufferPool, Arrays.asList(new TieredStorageMemorySpec(new Object(), 3, true), new TieredStorageMemorySpec((Object)this, 0, false)));
        Assertions.assertThat((boolean)storageMemoryManager.ensureCapacity(0)).isTrue();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(3);
        Assertions.assertThat((boolean)storageMemoryManager.ensureCapacity(2)).isTrue();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(5);
        Assertions.assertThat((boolean)storageMemoryManager.ensureCapacity(3)).isFalse();
        storageMemoryManager.release();
    }

    @Test
    void testRelease() throws IOException {
        int numBuffers = 5;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers, numBuffers);
        TieredStorageMemoryManagerImpl storageMemoryManager = this.createStorageMemoryManager(bufferPool, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 0)));
        this.requestedBuffers.add(storageMemoryManager.requestBufferBlocking((Object)this));
        Assertions.assertThat((int)storageMemoryManager.numOwnerRequestedBuffer((Object)this)).isOne();
        this.recycleRequestedBuffers();
        storageMemoryManager.release();
        Assertions.assertThat((int)storageMemoryManager.numOwnerRequestedBuffer((Object)this)).isZero();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isZero();
    }

    public void onBufferReclaimRequest() {
        ++this.reclaimBufferCounter;
        this.recycleRequestedBuffers();
        this.hasReclaimBufferFinished.complete(null);
    }

    private void recycleRequestedBuffers() {
        this.requestedBuffers.forEach(builder -> {
            BufferConsumer bufferConsumer = builder.createBufferConsumer();
            Buffer buffer = bufferConsumer.build();
            buffer.getRecycler().recycle(buffer.getMemorySegment());
        });
        this.requestedBuffers.clear();
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int numBuffersInBufferPool, List<TieredStorageMemorySpec> storageMemorySpecs) throws IOException {
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffersInBufferPool, numBuffersInBufferPool, numBuffersInBufferPool);
        return this.createStorageMemoryManager(bufferPool, storageMemorySpecs);
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs) {
        TieredStorageMemoryManagerImpl storageProducerMemoryManager = new TieredStorageMemoryManagerImpl(0.6f, true);
        storageProducerMemoryManager.setup(bufferPool, storageMemorySpecs);
        return storageProducerMemoryManager;
    }

    private static void recycleBufferBuilder(BufferBuilder bufferBuilder) {
        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
        Buffer buffer = bufferConsumer.build();
        NetworkBuffer networkBuffer = new NetworkBuffer(buffer.getMemorySegment(), buffer.getRecycler(), buffer.getDataType());
        networkBuffer.recycleBuffer();
    }
}

