/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.concurrent.FutureConsumerWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final int bufferSize = 1024;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testResultSubpartitionInfo() {
        int numPartitions = 2;
        int numSubpartitions = 3;
        for (int i = 0; i < 2; ++i) {
            PipelinedResultPartition partition = (PipelinedResultPartition)new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build();
            ResultSubpartition[] subpartitions = partition.getAllPartitions();
            for (int j = 0; j < subpartitions.length; ++j) {
                ResultSubpartitionInfo subpartitionInfo = subpartitions[j].getSubpartitionInfo();
                Assert.assertEquals((long)i, (long)subpartitionInfo.getPartitionIdx());
                Assert.assertEquals((long)j, (long)subpartitionInfo.getSubPartitionIdx());
            }
        }
    }

    @Test
    public void testNotifyPartitionDataAvailable() throws Exception {
        FutureConsumerWithException[] notificationCalls;
        for (FutureConsumerWithException notificationCall : notificationCalls = new FutureConsumerWithException[]{writer -> ((ResultPartitionWriter)writer).finish(), writer -> ((ResultPartitionWriter)writer).emitRecord(ByteBuffer.allocate(1024), 0), writer -> ((ResultPartitionWriter)writer).broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, false), writer -> ((ResultPartitionWriter)writer).broadcastRecord(ByteBuffer.allocate(1024))}) {
            this.testNotifyPartitionDataAvailable((FutureConsumerWithException<ResultPartitionWriter, Exception>)notificationCall);
        }
    }

    private void testNotifyPartitionDataAvailable(FutureConsumerWithException<ResultPartitionWriter, Exception> notificationCall) throws Exception {
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
        ResultPartitionWriter consumableNotifyingPartitionWriter = this.createConsumableNotifyingResultPartitionWriter(ResultPartitionType.PIPELINED, (TaskActions)taskActions, jobId, notifier);
        notificationCall.accept((Object)consumableNotifyingPartitionWriter);
        notifier.check(jobId, consumableNotifyingPartitionWriter.getPartitionId(), (TaskActions)taskActions, 1);
        notifier = new TestResultPartitionConsumableNotifier();
        ResultPartitionWriter partition = this.createConsumableNotifyingResultPartitionWriter(ResultPartitionType.BLOCKING, (TaskActions)taskActions, jobId, notifier);
        notificationCall.accept((Object)partition);
        notifier.check(null, null, null, 0);
    }

    @Test
    public void testAddOnFinishedPipelinedPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnFinishedBlockingPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        manager.registerResultPartition(partition);
        partition.finish();
        MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
        for (int x = 0; x < 2; ++x) {
            ResultSubpartitionView subpartitionView1 = partition.createSubpartitionView(0, () -> {});
            subpartitionView1.releaseAllResources();
            MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
            Assert.assertFalse((boolean)partition.isReleased());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnFinishedPartition(ResultPartitionType partitionType) throws Exception {
        TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{bufferWritingResultPartition}, (TaskActions)new NoOpTaskActions(), (JobID)new JobID(), (ResultPartitionConsumableNotifier)notifier)[0];
        try {
            partitionWriter.finish();
            notifier.reset();
            partitionWriter.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            notifier.check(null, null, null, 0);
        }
    }

    @Test
    public void testAddOnReleasedPipelinedPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnReleasedBlockingPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
        TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{bufferWritingResultPartition}, (TaskActions)new NoOpTaskActions(), (JobID)new JobID(), (ResultPartitionConsumableNotifier)notifier)[0];
        try {
            partitionWriter.release(null);
            partitionWriter.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            notifier.check(null, null, null, 0);
        }
    }

    @Test
    public void testAddOnPipelinedPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnBlockingPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).build();
        manager.registerResultPartition(partition);
        partition.fail(null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException((ResultPartitionProvider)manager, partition.getPartitionId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnPartition(ResultPartitionType partitionType) throws Exception {
        TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
        JobID jobId = new JobID();
        NoOpTaskActions taskActions = new NoOpTaskActions();
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{bufferWritingResultPartition}, (TaskActions)taskActions, (JobID)jobId, (ResultPartitionConsumableNotifier)notifier)[0];
        try {
            partitionWriter.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
            if (partitionType.isPipelined()) {
                notifier.check(jobId, partitionWriter.getPartitionId(), (TaskActions)taskActions, 1);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
        int numAllBuffers = 10;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            for (int i = 0; i < 10; ++i) {
                resultPartition.emitRecord(ByteBuffer.allocate(1023), 0);
            }
            Assert.assertEquals((long)0L, (long)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            resultPartition.close();
            Assert.assertTrue((boolean)resultPartition.getBufferPool().isDestroyed());
            Assert.assertEquals((long)10L, (long)network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
            resultPartition.release();
            Assert.assertEquals((long)0L, (long)network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
        }
        finally {
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNot() throws IOException {
        int numAllBuffers = 10;
        int bufferSize = 1024;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            resultPartition.getBufferPool().setNumBuffers(2);
            Assert.assertTrue((boolean)resultPartition.getAvailableFuture().isDone());
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertFalse((boolean)resultPartition.getAvailableFuture().isDone());
        }
        finally {
            resultPartition.release();
            network.close();
        }
    }

    @Test
    public void testPipelinedPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    public void testBlockingPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPartitionBufferPool(ResultPartitionType type) throws Exception {
        int networkBuffersPerChannel = 2;
        int floatingNetworkBuffersPerGate = 8;
        NetworkBufferPool globalPool = new NetworkBufferPool(20, 1);
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionType(type).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(globalPool).build();
        try {
            partition.setup();
            BufferPool bufferPool = partition.getBufferPool();
            Assert.assertEquals((long)(partition.getNumberOfSubpartitions() + 1), (long)bufferPool.getNumberOfRequiredMemorySegments());
            if (type.isBounded()) {
                int maxNumBuffers = 2 * partition.getNumberOfSubpartitions() + 8;
                Assert.assertEquals((long)maxNumBuffers, (long)bufferPool.getMaxNumberOfMemorySegments());
            } else {
                Assert.assertEquals((long)Integer.MAX_VALUE, (long)bufferPool.getMaxNumberOfMemorySegments());
            }
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter(ResultPartitionType partitionType, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) throws IOException {
        BufferWritingResultPartition partition = this.createResultPartition(partitionType);
        return ConsumableNotifyingResultPartitionWriterDecorator.decorate(Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), (ResultPartitionWriter[])new ResultPartitionWriter[]{partition}, (TaskActions)taskActions, (JobID)jobId, (ResultPartitionConsumableNotifier)notifier)[0];
    }

    private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, fileChannelManager, partitionType, 2);
        resultPartition.setup();
        return (BufferWritingResultPartition)resultPartition;
    }

    @Test
    public void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
        int bufferSize = 1024;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        BufferWritingResultPartition resultPartition = (BufferWritingResultPartition)new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        resultPartition.setup();
        resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        Assert.assertNotNull((Object)buffer);
        MatcherAssert.assertThat((Object)resultPartition.getBackPressuredTimeMsPerSecond().getValue(), (Matcher)Matchers.equalTo((Object)0L));
        CountDownLatch syncLock = new CountDownLatch(1);
        Thread requestThread = new Thread(() -> {
            try {
                syncLock.countDown();
                resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        requestThread.start();
        syncLock.await();
        Thread.sleep(100L);
        buffer.recycleBuffer();
        requestThread.join();
        Assert.assertThat((Object)resultPartition.getBackPressuredTimeMsPerSecond().getCount(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        Assert.assertNotNull((Object)readView.getNextBuffer().buffer());
    }

    @Test
    public void testFlushBoundedBlockingResultPartition() throws IOException {
        int value = 1024;
        BufferWritingResultPartition partition = this.createResultPartition(ResultPartitionType.BLOCKING);
        ByteBuffer record = ByteBuffer.allocate(4);
        record.putInt(value);
        record.rewind();
        partition.emitRecord(record, 0);
        partition.flush(0);
        record.rewind();
        partition.emitRecord(record, 0);
        record.rewind();
        partition.broadcastRecord(record);
        partition.flushAll();
        record.rewind();
        partition.broadcastRecord(record);
        partition.finish();
        record.rewind();
        ResultSubpartitionView readView1 = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 4; ++i) {
            Assert.assertEquals((Object)record, (Object)readView1.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse((boolean)readView1.getNextBuffer().buffer().isBuffer());
        Assert.assertNull((Object)readView1.getNextBuffer());
        ResultSubpartitionView readView2 = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 2; ++i) {
            Assert.assertEquals((Object)record, (Object)readView2.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse((boolean)readView2.getNextBuffer().buffer().isBuffer());
        Assert.assertNull((Object)readView2.getNextBuffer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)bufferWritingResultPartition.subpartitions[0];
        int partialLength = 341;
        try {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(partialLength), 0);
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)2L, (long)pipelinedSubpartition.getCurrentNumberOfBuffers());
            Assert.assertEquals((long)0L, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            Assert.assertEquals((long)partialLength, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        int partialLength = 341;
        try {
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(partialLength));
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
        }
        finally {
            for (ResultSubpartition resultSubpartition : bufferWritingResultPartition.subpartitions) {
                PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)resultSubpartition;
                Assert.assertEquals((long)2L, (long)pipelinedSubpartition.getCurrentNumberOfBuffers());
                Assert.assertEquals((long)0L, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
                Assert.assertEquals((long)partialLength, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            }
        }
    }

    private static class TestResultPartitionConsumableNotifier
    implements ResultPartitionConsumableNotifier {
        private JobID jobID;
        private ResultPartitionID partitionID;
        private TaskActions taskActions;
        private int numNotification;

        private TestResultPartitionConsumableNotifier() {
        }

        public void notifyPartitionConsumable(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions) {
            ++this.numNotification;
            this.jobID = jobID;
            this.partitionID = partitionID;
            this.taskActions = taskActions;
        }

        private void check(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions, int numNotification) {
            Assert.assertEquals((Object)jobID, (Object)this.jobID);
            Assert.assertEquals((Object)partitionID, (Object)this.partitionID);
            Assert.assertEquals((Object)taskActions, (Object)this.taskActions);
            Assert.assertEquals((long)numNotification, (long)this.numNotification);
        }

        private void reset() {
            this.jobID = null;
            this.partitionID = null;
            this.taskActions = null;
            this.numNotification = 0;
        }
    }
}

