/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class LocalInputChannelTest {
    @Test
    public void testNoDataPersistedAfterReceivingAlignedBarrier() throws Exception {
        CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, CheckpointOptions.alignedWithTimeout((CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)123L));
        MemorySegment memorySegment = EventSerializer.toBuffer((AbstractEvent)barrier, (boolean)false).getMemorySegment();
        BufferConsumer barrierHolder = new BufferConsumer((Buffer)new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.EVENT_BUFFER), memorySegment.size());
        BufferConsumer data = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1);
        RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
        LocalInputChannel channel = InputChannelBuilder.newBuilder().setPartitionManager(new SingleInputGateTest.TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(barrierHolder, data))).setStateWriter(stateWriter).buildLocalChannel(new SingleInputGateBuilder().build());
        channel.requestSubpartition(0);
        channel.getNextBuffer();
        stateWriter.start(barrier.getId(), barrier.getCheckpointOptions());
        channel.checkpointStarted(barrier);
        channel.getNextBuffer();
        Assert.assertTrue((String)"no data should be persisted after receiving a barrier", (boolean)stateWriter.getAddedInput().isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentConsumeMultiplePartitions() throws Exception {
        int parallelism = 32;
        int producerBufferPoolSize = 33;
        int numberOfBuffersPerChannel = 1024;
        Preconditions.checkArgument((boolean)true);
        Preconditions.checkArgument((boolean)true);
        Preconditions.checkArgument((boolean)true);
        ExecutorService executor = Executors.newFixedThreadPool(64);
        NetworkBufferPool networkBuffers = new NetworkBufferPool(2080, 32768);
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        ResultPartitionID[] partitionIds = new ResultPartitionID[32];
        TestPartitionProducer[] partitionProducers = new TestPartitionProducer[32];
        for (int i = 0; i < 32; ++i) {
            partitionIds[i] = new ResultPartitionID();
            ResultPartition partition = new ResultPartitionBuilder().setResultPartitionId(partitionIds[i]).setNumberOfSubpartitions(32).setNumTargetKeyGroups(32).setResultPartitionManager(partitionManager).setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> networkBuffers.createBufferPool(33, 33, 32, Integer.MAX_VALUE))).build();
            partition.setup();
            partitionProducers[i] = new TestPartitionProducer((BufferWritingResultPartition)partition, false, new TestPartitionProducerBufferSource(32, 32768, 1024));
        }
        try {
            int i;
            ArrayList results = Lists.newArrayListWithCapacity((int)33);
            for (i = 0; i < 32; ++i) {
                results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
            }
            for (i = 0; i < 32; ++i) {
                TestLocalInputChannelConsumer consumer = new TestLocalInputChannelConsumer(i, 32, 1024, networkBuffers.createBufferPool(32, 32), partitionManager, new TaskEventDispatcher(), partitionIds);
                results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), executor));
            }
            FutureUtils.waitForAll((Collection)results).get();
        }
        finally {
            networkBuffers.destroyAllBufferPools();
            networkBuffers.destroy();
            executor.shutdown();
        }
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        int initialBackoff = 500;
        int maxBackoff = 3000;
        int[] expectedDelays = new int[]{initialBackoff, 1000, 2000, maxBackoff};
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        LocalInputChannel ch = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager, initialBackoff, maxBackoff);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenThrow(new Throwable[]{new PartitionNotFoundException(ch.partitionId)});
        Timer timer = (Timer)Mockito.mock(Timer.class);
        ((Timer)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                ((TimerTask)invocation.getArguments()[0]).run();
                return null;
            }
        }).when((Object)timer)).schedule((TimerTask)Matchers.any(TimerTask.class), Matchers.anyLong());
        ch.requestSubpartition(0);
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager)).createSubpartitionView((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
        int[] nArray = expectedDelays;
        int n = nArray.length;
        for (int i = 0; i < n; ++i) {
            long expected = nArray[i];
            ch.retriggerSubpartitionRequest(timer, 0);
            ((Timer)Mockito.verify((Object)timer)).schedule((TimerTask)Matchers.any(TimerTask.class), Matchers.eq((long)expected));
        }
        try {
            ch.retriggerSubpartitionRequest(timer, 0);
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test(expected=CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ResultSubpartitionView view = (ResultSubpartitionView)Mockito.mock(ResultSubpartitionView.class);
        Mockito.when((Object)view.isReleased()).thenReturn((Object)true);
        Mockito.when((Object)view.getFailureCause()).thenReturn((Object)new Exception("Expected test exception"));
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenReturn((Object)view);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        LocalInputChannel ch = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        ch.requestSubpartition(0);
        ch.getNextBuffer();
    }

    @Test
    public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        try {
            localChannel.requestSubpartition(0);
            Assert.fail((String)"Should throw a PartitionNotFoundException.");
        }
        catch (PartitionNotFoundException notFound) {
            Assert.assertThat((Object)localChannel.getPartitionId(), (Matcher)org.hamcrest.Matchers.is((Object)notFound.getPartitionId()));
        }
    }

    @Test
    public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager(), 1, 1);
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        localChannel.requestSubpartition(0);
        Assert.assertNotNull((Object)inputGate.getRetriggerLocalRequestTimer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testChannelErrorWhileRetriggeringRequest() {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        final LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        Timer timer = new Timer(true){

            @Override
            public void schedule(TimerTask task, long delay) {
                task.run();
                try {
                    localChannel.checkError();
                    Assert.fail((String)"Should throw a PartitionNotFoundException.");
                }
                catch (PartitionNotFoundException notFound) {
                    Assert.assertThat((Object)localChannel.partitionId, (Matcher)org.hamcrest.Matchers.is((Object)notFound.getPartitionId()));
                }
                catch (IOException ex) {
                    Assert.fail((String)"Should throw a PartitionNotFoundException.");
                }
            }
        };
        try {
            localChannel.retriggerSubpartitionRequest(timer, 0);
        }
        finally {
            timer.cancel();
        }
    }

    @Test
    public void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception {
        final SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenAnswer((Answer)new Answer<ResultSubpartitionView>(){

            public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(100L);
                throw new PartitionNotFoundException(new ResultPartitionID());
            }
        });
        final LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(gate, partitionManager, 1, 1);
        Thread releaser = new Thread(){

            @Override
            public void run() {
                try {
                    gate.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        Thread requester = new Thread(){

            @Override
            public void run() {
                try {
                    channel.requestSubpartition(0);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        requester.start();
        releaser.start();
        releaser.join();
        requester.join();
    }

    @Test
    public void testGetNextAfterPartitionReleased() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(false);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartition(0);
        Assert.assertFalse((boolean)channel.getNextBuffer().isPresent());
        subpartitionView.releaseAllResources();
        try {
            channel.getNextBuffer();
            Assert.fail((String)"Did not throw expected CancelTaskException");
        }
        catch (CancelTaskException cancelTaskException) {
            // empty catch block
        }
        channel.releaseAllResources();
        Assert.assertFalse((boolean)channel.getNextBuffer().isPresent());
    }

    @Test
    public void testGetBufferFromLocalChannelWhenCompressionEnabled() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(true);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartition(0);
        Optional bufferAndAvailability = channel.getNextBuffer();
        Assert.assertTrue((boolean)bufferAndAvailability.isPresent());
        Assert.assertFalse((boolean)((InputChannel.BufferAndAvailability)bufferAndAvailability.get()).buffer().isCompressed());
    }

    @Test(expected=IllegalStateException.class)
    public void testUnblockReleasedChannel() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        localChannel.releaseAllResources();
        localChannel.resumeConsumption();
    }

    @Test(expected=IllegalStateException.class)
    public void testAnnounceBufferSize() throws Exception {
        AtomicInteger lastBufferSize = new AtomicInteger(0);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(true));
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        localChannel.requestSubpartition(0);
        localChannel.announceBufferSize(10);
        localChannel.releaseAllResources();
        localChannel.announceBufferSize(12);
    }

    @Test
    public void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException {
        PipelinedResultPartition parent = (PipelinedResultPartition)PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE);
        ResultSubpartition subpartition = parent.getAllPartitions()[0];
        ResultSubpartitionView subpartitionView = subpartition.createReadView(() -> {});
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartition(0);
        subpartition.add(EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), (boolean)false));
        Assert.assertTrue((boolean)channel.getNextBuffer().isPresent());
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        subpartition.flush();
        Assert.assertFalse((boolean)channel.inputGate.pollNext().isPresent());
        channel.resumeConsumption();
        Optional nextBuffer = channel.inputGate.pollNext();
        Assert.assertTrue((boolean)nextBuffer.isPresent());
        Assert.assertTrue((boolean)((BufferOrEvent)nextBuffer.get()).isBuffer());
    }

    @Test
    public void testCheckpointingInflightData() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().build();
        PipelinedResultPartition parent = (PipelinedResultPartition)PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE);
        ResultSubpartition subpartition = parent.getAllPartitions()[0];
        ResultSubpartitionView subpartitionView = subpartition.createReadView(() -> {});
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager, 0, 0, b -> b.setStateWriter(stateWriter));
        inputGate.setInputChannels(new InputChannel[]{channel});
        channel.requestSubpartition(0);
        CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
        CheckpointOptions options = CheckpointOptions.unaligned((CheckpointStorageLocationReference)location);
        stateWriter.start(0L, options);
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 123L, options);
        channel.checkpointStarted(barrier);
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertTrue((boolean)channel.getNextBuffer().isPresent());
        subpartition.add(EventSerializer.toBufferConsumer((AbstractEvent)barrier, (boolean)true));
        Assert.assertTrue((boolean)channel.getNextBuffer().isPresent());
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertTrue((boolean)channel.getNextBuffer().isPresent());
        Assert.assertArrayEquals((int[])stateWriter.getAddedInput().get((Object)channel.getChannelInfo()).stream().mapToInt(Buffer::getSize).toArray(), (int[])new int[]{1});
    }

    @Test
    public void testAnnounceNewBufferSize() throws IOException, InterruptedException {
        PipelinedResultPartition parent = (PipelinedResultPartition)new ResultPartitionBuilder().setResultPartitionType(ResultPartitionType.PIPELINED).setFileChannelManager(NoOpFileChannelManager.INSTANCE).setNumberOfSubpartitions(2).build();
        ResultSubpartition subpartition0 = parent.getAllPartitions()[0];
        ResultSubpartition subpartition1 = parent.getAllPartitions()[1];
        LocalInputChannel channel0 = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(subpartition0.createReadView(() -> {})));
        LocalInputChannel channel1 = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(subpartition1.createReadView(() -> {})));
        channel0.requestSubpartition(0);
        channel1.requestSubpartition(0);
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)subpartition0.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16)));
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)subpartition1.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16)));
        channel0.announceBufferSize(9);
        channel1.announceBufferSize(20);
        Assert.assertEquals((long)9L, (long)subpartition0.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16)));
        Assert.assertEquals((long)20L, (long)subpartition1.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16)));
    }

    @Test
    public void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096), BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096), BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        Assert.assertEquals((long)0L, (long)localChannel.getBuffersInUseCount());
        localChannel.requestSubpartition(0);
        Assert.assertEquals((long)3L, (long)localChannel.getBuffersInUseCount());
    }

    private static class TestLocalInputChannelConsumer
    implements Callable<Void> {
        private final SingleInputGate inputGate;
        private final int numberOfInputChannels;
        private final int numberOfExpectedBuffersPerChannel;

        public TestLocalInputChannelConsumer(int subpartitionIndex, int numberOfInputChannels, int numberOfExpectedBuffersPerChannel, BufferPool bufferPool, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, ResultPartitionID[] consumedPartitionIds) throws IOException, InterruptedException {
            Preconditions.checkArgument((numberOfInputChannels >= 1 ? 1 : 0) != 0);
            Preconditions.checkArgument((numberOfExpectedBuffersPerChannel >= 1 ? 1 : 0) != 0);
            this.inputGate = new SingleInputGateBuilder().setConsumedSubpartitionIndex(subpartitionIndex).setNumberOfChannels(numberOfInputChannels).setBufferPoolFactory(bufferPool).build();
            InputChannel[] inputChannels = new InputChannel[numberOfInputChannels];
            for (int i = 0; i < numberOfInputChannels; ++i) {
                inputChannels[i] = InputChannelBuilder.newBuilder().setChannelIndex(i).setPartitionManager(partitionManager).setPartitionId(consumedPartitionIds[i]).setTaskEventPublisher((TaskEventPublisher)taskEventDispatcher).buildLocalChannel(this.inputGate);
            }
            InputGateFairnessTest.setupInputGate(this.inputGate, inputChannels);
            this.numberOfInputChannels = numberOfInputChannels;
            this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            int[] numberOfBuffersPerChannel = new int[this.numberOfInputChannels];
            try {
                Optional boe;
                while ((boe = this.inputGate.getNext()).isPresent()) {
                    if (!((BufferOrEvent)boe.get()).isBuffer()) continue;
                    ((BufferOrEvent)boe.get()).getBuffer().recycleBuffer();
                    int n = ((BufferOrEvent)boe.get()).getChannelInfo().getInputChannelIdx();
                    numberOfBuffersPerChannel[n] = numberOfBuffersPerChannel[n] + 1;
                    if (numberOfBuffersPerChannel[n] <= this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received more buffers than expected on channel " + ((BufferOrEvent)boe.get()).getChannelInfo() + ".");
                }
                for (int i = 0; i < numberOfBuffersPerChannel.length; ++i) {
                    int actualNumberOfReceivedBuffers = numberOfBuffersPerChannel[i];
                    if (actualNumberOfReceivedBuffers == this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received unexpected number of buffers on channel " + i + " (" + actualNumberOfReceivedBuffers + " instead of " + this.numberOfExpectedBuffersPerChannel + ").");
                }
            }
            finally {
                this.inputGate.close();
            }
            return null;
        }
    }

    private static class TestPartitionProducerBufferSource
    implements TestProducerSource {
        private final int bufferSize;
        private final List<Byte> channelIndexes;

        public TestPartitionProducerBufferSource(int parallelism, int bufferSize, int numberOfBuffersToProduce) {
            this.bufferSize = bufferSize;
            this.channelIndexes = Lists.newArrayListWithCapacity((int)(parallelism * numberOfBuffersToProduce));
            for (byte i = 0; i < parallelism; i = (byte)(i + 1)) {
                for (int j = 0; j < numberOfBuffersToProduce; ++j) {
                    this.channelIndexes.add(i);
                }
            }
            Collections.shuffle(this.channelIndexes);
        }

        @Override
        public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
            if (this.channelIndexes.size() > 0) {
                byte channelIndex = this.channelIndexes.remove(0);
                return new TestProducerSource.BufferAndChannel(new byte[this.bufferSize], channelIndex);
            }
            return null;
        }
    }
}

