/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBufferTest;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartition;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class SortMergeResultPartitionTest
extends TestLogger {
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private static final int totalBytes = 0x2000000;
    private static final int numThreads = 4;
    private final TestBufferAvailabilityListener listener = new TestBufferAvailabilityListener();
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ExecutorService readIOExecutor;
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public Timeout timeout = new Timeout(60L, TimeUnit.SECONDS);

    @Before
    public void setUp() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tmpFolder.getRoot().getPath()}, "testing");
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.readBufferPool = new BatchShuffleReadBufferPool(0x2000000L, 1024);
        this.readIOExecutor = Executors.newFixedThreadPool(4);
    }

    @After
    public void shutdown() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    public void testWriteAndRead() throws Exception {
        ByteBuffer record;
        int numSubpartitions = 10;
        int numBuffers = 100;
        int numRecords = 1000;
        Random random = new Random();
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(numSubpartitions, bufferPool);
        Queue[] dataWritten = new Queue[numSubpartitions];
        Queue[] buffersRead = new Queue[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            dataWritten[i] = new ArrayDeque();
            buffersRead[i] = new ArrayDeque();
        }
        int[] numBytesWritten = new int[numSubpartitions];
        int[] numBytesRead = new int[numSubpartitions];
        Arrays.fill(numBytesWritten, 0);
        Arrays.fill(numBytesRead, 0);
        for (int i = 0; i < numRecords; ++i) {
            int subpartition;
            record = this.generateRandomData(random.nextInt(2048) + 1, random);
            boolean isBroadCast = random.nextBoolean();
            if (isBroadCast) {
                partition.broadcastRecord(record);
                for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                    this.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.DATA_BUFFER);
                }
                continue;
            }
            subpartition = random.nextInt(numSubpartitions);
            partition.emitRecord(record, subpartition);
            this.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.DATA_BUFFER);
        }
        partition.finish();
        partition.close();
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            record = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
            this.recordDataWritten(record, dataWritten, subpartition, numBytesWritten, Buffer.DataType.EVENT_BUFFER);
        }
        ResultSubpartitionView[] views = this.createSubpartitionViews(partition, numSubpartitions);
        this.readData(views, bufferWithChannel -> {
            Buffer buffer = bufferWithChannel.getBuffer();
            int subpartition = bufferWithChannel.getChannelIndex();
            int numBytes = buffer.readableBytes();
            int n = subpartition;
            numBytesRead[n] = numBytesRead[n] + numBytes;
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
            segment.put(0, buffer.getNioBufferReadable(), numBytes);
            buffersRead[subpartition].add(new NetworkBuffer(segment, buf -> {}, buffer.getDataType(), numBytes));
        });
        PartitionSortedBufferTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
    }

    private void recordDataWritten(ByteBuffer record, Queue<PartitionSortedBufferTest.DataAndType>[] dataWritten, int subpartition, int[] numBytesWritten, Buffer.DataType dataType) {
        record.rewind();
        dataWritten[subpartition].add(new PartitionSortedBufferTest.DataAndType(record, dataType));
        int n = subpartition;
        numBytesWritten[n] = numBytesWritten[n] + record.remaining();
    }

    private ByteBuffer generateRandomData(int dataSize, Random random) {
        byte[] dataWritten = new byte[dataSize];
        random.nextBytes(dataWritten);
        return ByteBuffer.wrap(dataWritten);
    }

    private long readData(ResultSubpartitionView[] views, Consumer<BufferWithChannel> bufferProcessor) throws Exception {
        int dataSize = 0;
        int numEndOfPartitionEvents = 0;
        while (numEndOfPartitionEvents < views.length) {
            this.listener.waitForData();
            for (int subpartition = 0; subpartition < views.length; ++subpartition) {
                ResultSubpartitionView view = views[subpartition];
                ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer();
                while (bufferAndBacklog != null) {
                    Buffer buffer = bufferAndBacklog.buffer();
                    bufferProcessor.accept(new BufferWithChannel(buffer, subpartition));
                    dataSize += buffer.readableBytes();
                    buffer.recycleBuffer();
                    if (!buffer.isBuffer()) {
                        ++numEndOfPartitionEvents;
                        Assert.assertFalse((boolean)view.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
                        view.releaseAllResources();
                    }
                    bufferAndBacklog = view.getNextBuffer();
                }
            }
        }
        return dataSize;
    }

    private ResultSubpartitionView[] createSubpartitionViews(SortMergeResultPartition partition, int numSubpartitions) throws Exception {
        ResultSubpartitionView[] views = new ResultSubpartitionView[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            views[subpartition] = partition.createSubpartitionView(subpartition, (BufferAvailabilityListener)this.listener);
        }
        return views;
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        int numBuffers = 100;
        int numWriteBuffers = numBuffers / 2;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
        ByteBuffer recordWritten = this.generateRandomData(1024 * numBuffers, new Random());
        partition.emitRecord(recordWritten, 0);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.finish();
        partition.close();
        ResultSubpartitionView view = partition.createSubpartitionView(0, (BufferAvailabilityListener)this.listener);
        ByteBuffer recordRead = ByteBuffer.allocate(1024 * numBuffers);
        this.readData(new ResultSubpartitionView[]{view}, bufferWithChannel -> {
            Buffer buffer = bufferWithChannel.getBuffer();
            if (buffer.isBuffer()) {
                recordRead.put(buffer.getNioBufferReadable());
            }
        });
        recordWritten.rewind();
        recordRead.flip();
        Assert.assertEquals((Object)recordWritten, (Object)recordRead);
    }

    @Test
    public void testDataBroadcast() throws Exception {
        int numSubpartitions = 10;
        int numBuffers = 100;
        int numRecords = 10000;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(numSubpartitions, bufferPool);
        for (int i = 0; i < numRecords; ++i) {
            ByteBuffer record = this.generateRandomData(1024, new Random());
            partition.broadcastRecord(record);
        }
        partition.finish();
        partition.close();
        int eventSize = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE).remaining();
        long dataSize = numSubpartitions * numRecords * 1024 + numSubpartitions * eventSize;
        Assert.assertNotNull((Object)partition.getResultFile());
        Assert.assertEquals((long)2L, (long)((String[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].list())).length);
        for (File file : (File[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].listFiles())) {
            if (!file.getName().endsWith(".shuffle.data")) continue;
            Assert.assertTrue((file.length() < (long)(numSubpartitions * numRecords * 1024) ? 1 : 0) != 0);
        }
        ResultSubpartitionView[] views = this.createSubpartitionViews(partition, numSubpartitions);
        long dataRead = this.readData(views, ignored -> {});
        Assert.assertEquals((long)dataSize, (long)dataRead);
    }

    @Test
    public void testFlush() throws Exception {
        int numBuffers = 10;
        int numWriteBuffers = numBuffers / 2;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.emitRecord(ByteBuffer.allocate(1024), 0);
        partition.emitRecord(ByteBuffer.allocate(1024), 1);
        Assert.assertEquals((long)(3 + numWriteBuffers), (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.flush(0);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.emitRecord(ByteBuffer.allocate(1024), 2);
        partition.emitRecord(ByteBuffer.allocate(1024), 3);
        Assert.assertEquals((long)(3 + numWriteBuffers), (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.flushAll();
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        Assert.assertNull((Object)partition.getResultFile());
        partition.finish();
        Assert.assertEquals((long)3L, (long)partition.getResultFile().getNumRegions());
        partition.close();
    }

    @Test(expected=IllegalStateException.class)
    public void testReleaseWhileWriting() throws Exception {
        int numBuffers = 10;
        int numWriteBuffers = numBuffers / 2;
        int numBuffersForSort = numBuffers - numWriteBuffers;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.emitRecord(ByteBuffer.allocate(1024 * (numBuffersForSort - 1)), 0);
        partition.emitRecord(ByteBuffer.allocate(1024 * (numBuffersForSort - 1)), 1);
        partition.emitRecord(ByteBuffer.allocate(1024), 2);
        Assert.assertNull((Object)partition.getResultFile());
        Assert.assertEquals((long)2L, (long)this.fileChannelManager.getPaths()[0].list().length);
        partition.release();
        try {
            partition.emitRecord(ByteBuffer.allocate(1024 * numBuffers), 2);
        }
        catch (IllegalStateException exception) {
            Assert.assertEquals((long)0L, (long)this.fileChannelManager.getPaths()[0].list().length);
            throw exception;
        }
    }

    @Test
    public void testRelease() throws Exception {
        int numBuffers = 10;
        int numWriteBuffers = numBuffers / 2;
        int numBuffersForSort = numBuffers - numWriteBuffers;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.emitRecord(ByteBuffer.allocate(1024 * (numBuffersForSort - 1)), 0);
        partition.emitRecord(ByteBuffer.allocate(1024 * (numBuffersForSort - 1)), 1);
        partition.finish();
        partition.close();
        Assert.assertEquals((long)3L, (long)partition.getResultFile().getNumRegions());
        Assert.assertEquals((long)2L, (long)((String[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].list())).length);
        ResultSubpartitionView view = partition.createSubpartitionView(0, (BufferAvailabilityListener)this.listener);
        partition.release();
        while (!view.isReleased()) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            bufferAndBacklog.buffer().recycleBuffer();
        }
        while (partition.getResultFile() != null) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((long)0L, (long)((String[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].list())).length);
    }

    @Test
    public void testCloseReleasesAllBuffers() throws Exception {
        int numBuffers = 100;
        int numWriteBuffers = numBuffers / 2;
        int numBuffersForSort = numBuffers - numWriteBuffers;
        BufferPool bufferPool = this.globalPool.createBufferPool(numBuffers, numBuffers);
        SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
        Assert.assertEquals((long)numWriteBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.emitRecord(ByteBuffer.allocate(1024 * (numBuffersForSort - 1)), 5);
        Assert.assertEquals((long)numBuffers, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        partition.close();
        Assert.assertTrue((boolean)bufferPool.isDestroyed());
        Assert.assertEquals((long)1000L, (long)this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected=IllegalStateException.class)
    public void testReadUnfinishedPartition() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
            partition.createSubpartitionView(0, (BufferAvailabilityListener)this.listener);
        }
        finally {
            bufferPool.lazyDestroy();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testReadReleasedPartition() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            SortMergeResultPartition partition = this.createSortMergedPartition(10, bufferPool);
            partition.finish();
            partition.release();
            partition.createSubpartitionView(0, (BufferAvailabilityListener)this.listener);
        }
        finally {
            bufferPool.lazyDestroy();
        }
    }

    private SortMergeResultPartition createSortMergedPartition(int numSubpartitions, BufferPool bufferPool) throws IOException {
        return this.createSortMergedPartition(numSubpartitions, bufferPool, this.readBufferPool);
    }

    private SortMergeResultPartition createSortMergedPartition(int numSubpartitions, BufferPool bufferPool, BatchShuffleReadBufferPool readBufferPool) throws IOException {
        SortMergeResultPartition sortMergedResultPartition = new SortMergeResultPartition("SortMergedResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.BLOCKING, numSubpartitions, numSubpartitions, readBufferPool, (Executor)this.readIOExecutor, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), null, () -> bufferPool);
        sortMergedResultPartition.setup();
        return sortMergedResultPartition;
    }

    private static final class TestBufferAvailabilityListener
    implements BufferAvailabilityListener {
        private int numNotifications;

        private TestBufferAvailabilityListener() {
        }

        public synchronized void notifyDataAvailable() {
            if (this.numNotifications == 0) {
                this.notifyAll();
            }
            ++this.numNotifications;
        }

        public synchronized void waitForData() throws InterruptedException {
            if (this.numNotifications == 0) {
                this.wait();
            }
            this.numNotifications = 0;
        }
    }
}

