/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWithSubpartition;
import org.apache.flink.runtime.io.network.partition.DataBuffer;
import org.apache.flink.runtime.io.network.partition.HashBasedDataBuffer;
import org.apache.flink.runtime.io.network.partition.SortBasedDataBuffer;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class DataBufferTest {
    private final boolean useHashBuffer;

    @Parameters(name="UseHashBuffer = {0}")
    private static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    public DataBufferTest(boolean useHashBuffer) {
        this.useHashBuffer = useHashBuffer;
    }

    @TestTemplate
    void testWriteAndReadDataBuffer() throws Exception {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int bufferPoolSize = 512;
        Random random = new Random(1111L);
        Queue[] dataWritten = new Queue[numSubpartitions];
        Queue[] buffersRead = new Queue[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            dataWritten[i] = new ArrayDeque();
            buffersRead[i] = new ArrayDeque();
        }
        int[] numBytesWritten = new int[numSubpartitions];
        int[] numBytesRead = new int[numSubpartitions];
        Arrays.fill(numBytesWritten, 0);
        Arrays.fill(numBytesRead, 0);
        int totalBytesWritten = 0;
        int[] subpartitionReadOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, subpartitionReadOrder);
        int numDataBuffers = 5;
        while (numDataBuffers > 0) {
            BufferWithSubpartition buffer;
            int recordSize = random.nextInt(bufferSize * 4 - 1) + 1;
            byte[] bytes = new byte[recordSize];
            random.nextBytes(bytes);
            ByteBuffer record = ByteBuffer.wrap(bytes);
            int subpartition = random.nextInt(numSubpartitions);
            boolean isBuffer = random.nextBoolean();
            Buffer.DataType dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            boolean isFull = dataBuffer.append(record, subpartition, dataType);
            record.flip();
            if (record.hasRemaining()) {
                dataWritten[subpartition].add(new DataAndType(record, dataType));
                int n = subpartition;
                numBytesWritten[n] = numBytesWritten[n] + record.remaining();
                totalBytesWritten += record.remaining();
            }
            if (!isFull) continue;
            dataBuffer.finish();
            --numDataBuffers;
            while (dataBuffer.hasRemaining() && (buffer = this.copyIntoSegment(bufferSize, dataBuffer)) != null) {
                this.addBufferRead(buffer, buffersRead, numBytesRead);
            }
            dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, subpartitionReadOrder);
        }
        if (dataBuffer.hasRemaining()) {
            Assertions.assertThat((Object)dataBuffer).isInstanceOf(HashBasedDataBuffer.class);
            dataBuffer.finish();
            while (dataBuffer.hasRemaining()) {
                this.addBufferRead(this.copyIntoSegment(bufferSize, dataBuffer), buffersRead, numBytesRead);
            }
        }
        Assertions.assertThat((long)dataBuffer.numTotalBytes()).isZero();
        DataBufferTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
    }

    private BufferWithSubpartition copyIntoSegment(int bufferSize, DataBuffer dataBuffer) {
        if (this.useHashBuffer) {
            BufferWithSubpartition buffer = dataBuffer.getNextBuffer(null);
            if (buffer == null || !buffer.getBuffer().isBuffer()) {
                return buffer;
            }
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            int numBytes = buffer.getBuffer().readableBytes();
            segment.put(0, buffer.getBuffer().getNioBufferReadable(), numBytes);
            buffer.getBuffer().recycleBuffer();
            return new BufferWithSubpartition((Buffer)new NetworkBuffer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER, numBytes), buffer.getSubpartitionIndex());
        }
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
        return dataBuffer.getNextBuffer(segment);
    }

    private void addBufferRead(BufferWithSubpartition buffer, Queue<Buffer>[] buffersRead, int[] numBytesRead) {
        int subpartition = buffer.getSubpartitionIndex();
        buffersRead[subpartition].add(buffer.getBuffer());
        int n = subpartition;
        numBytesRead[n] = numBytesRead[n] + buffer.getBuffer().readableBytes();
    }

    /*
     * WARNING - void declaration
     */
    public static void checkWriteReadResult(int numSubpartitions, int[] numBytesWritten, int[] numBytesRead, Queue<DataAndType>[] dataWritten, Queue<Buffer>[] buffersRead) {
        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
            void var10_10;
            Assertions.assertThat((int)numBytesRead[subpartitionIndex]).isEqualTo(numBytesWritten[subpartitionIndex]);
            ArrayList<DataAndType> eventsWritten = new ArrayList<DataAndType>();
            ArrayList<Buffer> eventsRead = new ArrayList<Buffer>();
            ByteBuffer subpartitionDataWritten = ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
            for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
                subpartitionDataWritten.put(dataAndType.data);
                dataAndType.data.rewind();
                if (!dataAndType.dataType.isEvent()) continue;
                eventsWritten.add(dataAndType);
            }
            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
            for (Buffer buffer : buffersRead[subpartitionIndex]) {
                subpartitionDataRead.put(buffer.getNioBufferReadable());
                if (buffer.isBuffer()) continue;
                eventsRead.add(buffer);
            }
            subpartitionDataWritten.flip();
            subpartitionDataRead.flip();
            Assertions.assertThat((Comparable)subpartitionDataRead).isEqualTo((Object)subpartitionDataWritten);
            Assertions.assertThat(eventsRead).hasSameSizeAs(eventsWritten);
            boolean bl = false;
            while (var10_10 < eventsWritten.size()) {
                Assertions.assertThat((Comparable)((Buffer)eventsRead.get((int)var10_10)).getDataType()).isEqualTo((Object)((DataAndType)eventsWritten.get((int)var10_10)).dataType);
                Assertions.assertThat((Comparable)((Buffer)eventsRead.get((int)var10_10)).getNioBufferReadable()).isEqualTo((Object)((DataAndType)eventsWritten.get((int)var10_10)).data);
                ++var10_10;
            }
        }
    }

    @TestTemplate
    public void testWriteReadWithEmptySubpartition() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int numSubpartitions = 5;
        ByteBuffer[] subpartitionRecords = new ByteBuffer[]{ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null, ByteBuffer.allocate(1024)};
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions);
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            ByteBuffer record = subpartitionRecords[subpartition];
            if (record == null) continue;
            dataBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
            record.rewind();
        }
        dataBuffer.finish();
        this.checkReadResult(dataBuffer, subpartitionRecords[0], 0, bufferSize);
        ByteBuffer expected1 = subpartitionRecords[2].duplicate();
        expected1.limit(bufferSize);
        this.checkReadResult(dataBuffer, expected1.slice(), 2, bufferSize);
        ByteBuffer expected2 = subpartitionRecords[2].duplicate();
        expected2.position(bufferSize);
        this.checkReadResult(dataBuffer, expected2.slice(), 2, bufferSize);
        this.checkReadResult(dataBuffer, subpartitionRecords[4], 4, bufferSize);
    }

    private void checkReadResult(DataBuffer dataBuffer, ByteBuffer expectedBuffer, int expectedSubpartition, int bufferSize) {
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
        BufferWithSubpartition bufferWithSubpartition = dataBuffer.getNextBuffer(segment);
        Assertions.assertThat((int)bufferWithSubpartition.getSubpartitionIndex()).isEqualTo(expectedSubpartition);
        Assertions.assertThat((Comparable)bufferWithSubpartition.getBuffer().getNioBufferReadable()).isEqualTo((Object)expectedBuffer);
    }

    @TestTemplate
    void testWriteEmptyData() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        ByteBuffer record = ByteBuffer.allocate(1);
        record.position(1);
        Assertions.assertThatThrownBy(() -> dataBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER)).isInstanceOf(IllegalArgumentException.class);
    }

    @TestTemplate
    void testWriteFinishedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.finish();
        Assertions.assertThatThrownBy(() -> dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER)).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testWriteReleasedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.release();
        Assertions.assertThatThrownBy(() -> dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER)).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testWriteMoreDataThanCapacity() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, 1);
        for (int i = 1; i < bufferPoolSize; ++i) {
            this.appendAndCheckResult(dataBuffer, bufferSize, false, bufferSize * i, i, true);
        }
        int numRecords = bufferPoolSize - 1;
        long numBytes = bufferSize * numRecords;
        this.appendAndCheckResult(dataBuffer, bufferSize + 1, true, numBytes, numRecords, true);
    }

    @TestTemplate
    void testWriteLargeRecord() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, 1);
        this.appendAndCheckResult(dataBuffer, bufferPoolSize * bufferSize + 1, true, 0L, 0L, false);
    }

    private void appendAndCheckResult(DataBuffer dataBuffer, int recordSize, boolean isFull, long numBytes, long numRecords, boolean hasRemaining) throws IOException {
        ByteBuffer largeRecord = ByteBuffer.allocate(recordSize);
        Assertions.assertThat((boolean)dataBuffer.append(largeRecord, 0, Buffer.DataType.DATA_BUFFER)).isEqualTo(isFull);
        Assertions.assertThat((long)dataBuffer.numTotalBytes()).isEqualTo(numBytes);
        Assertions.assertThat((long)dataBuffer.numTotalRecords()).isEqualTo(numRecords);
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isEqualTo(hasRemaining);
    }

    @TestTemplate
    void testReadUnfinishedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isTrue();
        Assertions.assertThatThrownBy(() -> dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize))).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testReadReleasedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        dataBuffer.finish();
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isTrue();
        dataBuffer.release();
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isTrue();
        Assertions.assertThatThrownBy(() -> dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize))).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testReadEmptyDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.finish();
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isFalse();
        Assertions.assertThat((Object)dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize))).isNull();
    }

    @TestTemplate
    void testReleaseDataBuffer() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int recordSize = (bufferPoolSize - 1) * bufferSize;
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize, bufferPoolSize);
        LinkedList<MemorySegment> segments = new LinkedList<MemorySegment>();
        for (int i = 0; i < bufferPoolSize; ++i) {
            segments.add(bufferPool.requestMemorySegmentBlocking());
        }
        SortBasedDataBuffer dataBuffer = new SortBasedDataBuffer(segments, (BufferRecycler)bufferPool, 1, bufferSize, bufferPoolSize, null);
        dataBuffer.append(ByteBuffer.allocate(recordSize), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(bufferPoolSize);
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isTrue();
        Assertions.assertThat((long)dataBuffer.numTotalRecords()).isOne();
        Assertions.assertThat((long)dataBuffer.numTotalBytes()).isEqualTo((long)recordSize);
        dataBuffer.release();
        Assertions.assertThat((int)bufferPool.bestEffortGetNumOfUsedBuffers()).isZero();
        Assertions.assertThat((boolean)dataBuffer.hasRemaining()).isTrue();
        Assertions.assertThat((long)dataBuffer.numTotalRecords()).isOne();
        Assertions.assertThat((long)dataBuffer.numTotalBytes()).isEqualTo((long)recordSize);
    }

    private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions) throws Exception {
        return this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, null);
    }

    private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions, int[] customReadOrder) throws Exception {
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize, bufferPoolSize);
        LinkedList<MemorySegment> segments = new LinkedList<MemorySegment>();
        for (int i = 0; i < bufferPoolSize; ++i) {
            segments.add(bufferPool.requestMemorySegmentBlocking());
        }
        if (this.useHashBuffer) {
            return new HashBasedDataBuffer(segments, (BufferRecycler)bufferPool, numSubpartitions, bufferSize, bufferPoolSize, customReadOrder);
        }
        return new SortBasedDataBuffer(segments, (BufferRecycler)bufferPool, numSubpartitions, bufferSize, bufferPoolSize, customReadOrder);
    }

    public static int[] getRandomSubpartitionOrder(int numSubpartitions) {
        Random random = new Random(1111L);
        int[] subpartitionReadOrder = new int[numSubpartitions];
        int shift = random.nextInt(numSubpartitions);
        for (int i = 0; i < numSubpartitions; ++i) {
            subpartitionReadOrder[i] = (i + shift) % numSubpartitions;
        }
        return subpartitionReadOrder;
    }

    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        DataAndType(ByteBuffer data, Buffer.DataType dataType) {
            this.data = data;
            this.dataType = dataType;
        }
    }
}

