/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSubpartitionConsumerInternalOperation;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class HsMemoryDataManagerTest {
    private static final int NUM_BUFFERS = 10;
    private static final int NUM_SUBPARTITIONS = 3;
    private int poolSize = 10;
    private int bufferSize = 4;
    private Path dataFilePath;
    private Path indexFilePath;

    HsMemoryDataManagerTest() {
    }

    @BeforeEach
    void before(@TempDir Path tempDir) {
        this.dataFilePath = tempDir.resolve(".data");
        this.indexFilePath = tempDir.resolve(".index");
    }

    @Test
    void testAppendMarkBufferFinished() throws Exception {
        AtomicInteger finishedBuffers = new AtomicInteger(0);
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnBufferFinishedFunction((numTotalUnSpillBuffers, currentPoolSize) -> {
            finishedBuffers.incrementAndGet();
            return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
        }).build();
        this.bufferSize = 12;
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager(spillingStrategy);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(0), 0, Buffer.DataType.DATA_BUFFER);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(1), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((AtomicInteger)finishedBuffers).hasValue(0);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(2), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((AtomicInteger)finishedBuffers).hasValue(1);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(3), 0, Buffer.DataType.DATA_BUFFER);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(4), 0, Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat((AtomicInteger)finishedBuffers).hasValue(3);
    }

    @Test
    void testAppendRequestBuffer() throws Exception {
        this.poolSize = 3;
        ArrayList numFinishedBufferAndPoolSize = new ArrayList();
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnMemoryUsageChangedFunction((finishedBuffer, poolSize) -> {
            numFinishedBufferAndPoolSize.add(Tuple2.of((Object)finishedBuffer, (Object)poolSize));
            return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
        }).build();
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager(spillingStrategy);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(0), 0, Buffer.DataType.DATA_BUFFER);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(1), 1, Buffer.DataType.DATA_BUFFER);
        memoryDataManager.append(HsMemoryDataManagerTest.createRecord(2), 2, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((int)memoryDataManager.getNumTotalRequestedBuffers()).isEqualTo(3);
        List<Tuple2> expectedFinishedBufferAndPoolSize = Arrays.asList(Tuple2.of((Object)1, (Object)3), Tuple2.of((Object)2, (Object)3), Tuple2.of((Object)3, (Object)3));
        Assertions.assertThat(numFinishedBufferAndPoolSize).isEqualTo(expectedFinishedBufferAndPoolSize);
    }

    @Test
    void testHandleDecision() throws Exception {
        boolean targetSubpartition = false;
        int numFinishedBufferToTriggerDecision = 4;
        List<BufferIndexAndChannel> toSpill = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2);
        List<BufferIndexAndChannel> toRelease = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 2, 3);
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnBufferFinishedFunction((numFinishedBuffers, poolSize) -> {
            if (numFinishedBuffers < 4) {
                return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
            }
            return Optional.of(HsSpillingStrategy.Decision.builder().addBufferToSpill(0, toSpill).addBufferToRelease(0, toRelease).build());
        }).build();
        CompletableFuture spilledFuture = new CompletableFuture();
        CompletableFuture readableFuture = new CompletableFuture();
        TestingFileDataIndex dataIndex = TestingFileDataIndex.builder().setAddBuffersConsumer(spilledFuture::complete).setMarkBufferReadableConsumer((subpartitionId, bufferIndex) -> readableFuture.complete(bufferIndex)).build();
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager((HsSpillingStrategy)spillingStrategy, dataIndex);
        for (int i = 0; i < 4; ++i) {
            memoryDataManager.append(HsMemoryDataManagerTest.createRecord(i), 0, Buffer.DataType.DATA_BUFFER);
        }
        FlinkAssertions.assertThatFuture(spilledFuture).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(readableFuture).eventuallySucceeds();
        Assertions.assertThat(readableFuture).isCompletedWithValue((Object)2);
        Assertions.assertThat((int)memoryDataManager.getNumTotalUnSpillBuffers()).isEqualTo(1);
    }

    @Test
    void testHandleEmptyDecision() throws Exception {
        CompletableFuture globalDecisionFuture = new CompletableFuture();
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnBufferFinishedFunction((finishedBuffer, poolSize) -> Optional.empty()).setDecideActionWithGlobalInfoFunction(provider -> {
            globalDecisionFuture.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build();
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager(spillingStrategy);
        memoryDataManager.onBufferFinished();
        Assertions.assertThat(globalDecisionFuture).isCompleted();
    }

    @Test
    void testResultPartitionClosed() throws Exception {
        CompletableFuture resultPartitionReleaseFuture = new CompletableFuture();
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnResultPartitionClosedFunction(ignore -> {
            resultPartitionReleaseFuture.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build();
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager(spillingStrategy);
        memoryDataManager.close();
        Assertions.assertThat(resultPartitionReleaseFuture).isCompleted();
    }

    @Test
    void testSubpartitionConsumerRelease() throws Exception {
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().build();
        HsMemoryDataManager memoryDataManager = this.createMemoryDataManager(spillingStrategy);
        memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, (HsSubpartitionConsumerInternalOperations)new TestingSubpartitionConsumerInternalOperation());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, (HsSubpartitionConsumerInternalOperations)new TestingSubpartitionConsumerInternalOperation())).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Each subpartition view should have unique consumerId.");
        memoryDataManager.onConsumerReleased(0, HsConsumerId.DEFAULT);
        memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, (HsSubpartitionConsumerInternalOperations)new TestingSubpartitionConsumerInternalOperation());
    }

    @Test
    void testPoolSizeCheck() throws Exception {
        int requiredBuffers = 10;
        int maxBuffers = 100;
        CompletableFuture triggerGlobalDecision = new CompletableFuture();
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(100, this.bufferSize);
        BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10, 100);
        Assertions.assertThat((int)bufferPool.getNumBuffers()).isEqualTo(100);
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setDecideActionWithGlobalInfoFunction(spillingInfoProvider -> {
            Assertions.assertThat((int)spillingInfoProvider.getPoolSize()).isEqualTo(10);
            triggerGlobalDecision.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build();
        this.createMemoryDataManager((HsSpillingStrategy)spillingStrategy, bufferPool);
        networkBufferPool.createBufferPool(90, 90, 100);
        Assertions.assertThat((int)bufferPool.getNumBuffers()).isEqualTo(10);
        FlinkAssertions.assertThatFuture(triggerGlobalDecision).eventuallySucceeds();
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy) throws Exception {
        return this.createMemoryDataManager(spillStrategy, (HsFileDataIndex)new HsFileDataIndexImpl(3, this.indexFilePath, 256, Long.MAX_VALUE));
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, this.bufferSize);
        BufferPool bufferPool = networkBufferPool.createBufferPool(this.poolSize, this.poolSize, this.poolSize);
        return this.createMemoryDataManager(bufferPool, spillStrategy, fileDataIndex);
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy spillingStrategy, BufferPool bufferPool) throws Exception {
        return this.createMemoryDataManager(bufferPool, spillingStrategy, (HsFileDataIndex)new HsFileDataIndexImpl(3, this.indexFilePath, 256, Long.MAX_VALUE));
    }

    private HsMemoryDataManager createMemoryDataManager(BufferPool bufferPool, HsSpillingStrategy spillStrategy, HsFileDataIndex fileDataIndex) throws Exception {
        HsMemoryDataManager memoryDataManager = new HsMemoryDataManager(3, this.bufferSize, bufferPool, spillStrategy, fileDataIndex, this.dataFilePath, null, 1000L);
        memoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        return memoryDataManager;
    }

    private static ByteBuffer createRecord(int value) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
        byteBuffer.putInt(value);
        byteBuffer.flip();
        return byteBuffer;
    }
}

