/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class HashBufferAccumulatorTest {
    public static final int NUM_TOTAL_BUFFERS = 1000;
    public static final int NETWORK_BUFFER_SIZE = 1024;
    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
    private NetworkBufferPool globalPool;

    HashBufferAccumulatorTest() {
    }

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @Test
    void testAccumulateRecordsAndGenerateFinishedBuffers() throws IOException {
        this.testAccumulateRecordsAndGenerateFinishedBuffers(true);
    }

    @Test
    void testAccumulateRecordsAndGenerateFinishedBuffersWithPartialRecordUnallowed() throws IOException {
        this.testAccumulateRecordsAndGenerateFinishedBuffers(false);
    }

    private void testAccumulateRecordsAndGenerateFinishedBuffers(boolean isPartialRecordAllowed) throws IOException {
        int numBuffers = 10;
        int numRecords = 1000;
        TieredStorageSubpartitionId subpartitionId = new TieredStorageSubpartitionId(0);
        Random random = new Random();
        TieredStorageMemoryManagerImpl tieredStorageMemoryManager = this.createStorageMemoryManager(numBuffers);
        try (HashBufferAccumulator bufferAccumulator = new HashBufferAccumulator(1, 1024, (TieredStorageMemoryManager)tieredStorageMemoryManager, isPartialRecordAllowed);){
            AtomicInteger numReceivedFinishedBuffer = new AtomicInteger(0);
            bufferAccumulator.setup((subpartition, buffer, numRemainingBuffers) -> {
                numReceivedFinishedBuffer.incrementAndGet();
                buffer.recycleBuffer();
            });
            int numRecordBytesSinceLastEvent = 0;
            int numExpectBuffers = 0;
            for (int i = 0; i < numRecords; ++i) {
                ByteBuffer record;
                Buffer.DataType dataType;
                boolean isBuffer = random.nextBoolean() && i != numRecords - 1;
                Buffer.DataType dataType2 = dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
                if (isBuffer) {
                    int numBytes = random.nextInt(2048) + 1;
                    if (!isPartialRecordAllowed && numRecordBytesSinceLastEvent + numBytes > 1024 && numRecordBytesSinceLastEvent > 0) {
                        ++numExpectBuffers;
                        numRecordBytesSinceLastEvent = 0;
                    }
                    if (!isPartialRecordAllowed && numBytes > 1024) {
                        numExpectBuffers += numBytes / 1024 + (numBytes % 1024 == 0 ? 0 : 1);
                    } else {
                        numRecordBytesSinceLastEvent += numBytes;
                    }
                    record = TieredStorageTestUtils.generateRandomData(numBytes, random);
                } else {
                    numExpectBuffers += numRecordBytesSinceLastEvent / 1024 + (numRecordBytesSinceLastEvent % 1024 == 0 ? 0 : 1);
                    record = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
                    ++numExpectBuffers;
                    numRecordBytesSinceLastEvent = 0;
                }
                bufferAccumulator.receive(record, subpartitionId, dataType, false);
            }
            Assertions.assertThat((int)numReceivedFinishedBuffer.get()).isEqualTo(numExpectBuffers);
        }
    }

    @Test
    void testEventShouldNotRequestBufferFromMemoryManager() throws IOException {
        int numBuffers = 10;
        TieredStorageMemoryManagerImpl tieredStorageMemoryManager = this.createStorageMemoryManager(numBuffers);
        try (HashBufferAccumulator bufferAccumulator = new HashBufferAccumulator(1, 1024, (TieredStorageMemoryManager)tieredStorageMemoryManager, true);){
            bufferAccumulator.setup((subpartition, buffer, numRemainingBuffers) -> buffer.recycleBuffer());
            ByteBuffer endEvent = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
            bufferAccumulator.receive(endEvent, new TieredStorageSubpartitionId(0), Buffer.DataType.EVENT_BUFFER, false);
            Assertions.assertThat((int)tieredStorageMemoryManager.numOwnerRequestedBuffer((Object)bufferAccumulator)).isZero();
        }
    }

    @Test
    void testCloseWithUnFinishedBuffers() throws IOException {
        int numBuffers = 10;
        TieredStorageMemoryManagerImpl tieredStorageMemoryManager = this.createStorageMemoryManager(numBuffers);
        HashBufferAccumulator bufferAccumulator = new HashBufferAccumulator(1, 1024, (TieredStorageMemoryManager)tieredStorageMemoryManager, true);
        bufferAccumulator.setup((subpartition, buffer, numRemainingBuffers) -> buffer.recycleBuffer());
        bufferAccumulator.receive(TieredStorageTestUtils.generateRandomData(1, new Random()), new TieredStorageSubpartitionId(0), Buffer.DataType.DATA_BUFFER, false);
        Assertions.assertThat((int)tieredStorageMemoryManager.numOwnerRequestedBuffer((Object)bufferAccumulator)).isEqualTo(1);
        bufferAccumulator.close();
        Assertions.assertThat((int)tieredStorageMemoryManager.numOwnerRequestedBuffer((Object)this)).isZero();
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int numBuffersInBufferPool) throws IOException {
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffersInBufferPool, numBuffersInBufferPool, numBuffersInBufferPool);
        TieredStorageMemoryManagerImpl storageMemoryManager = new TieredStorageMemoryManagerImpl(0.6f, true);
        storageMemoryManager.setup(bufferPool, Collections.singletonList(new TieredStorageMemorySpec((Object)this, 1)));
        return storageMemoryManager;
    }
}

