/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingResultPartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.CountingAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BoundedBlockingSubpartitionAvailabilityTest {
    @TempDir
    private static Path tmpFolder;
    private static final int BUFFER_SIZE = 32768;

    BoundedBlockingSubpartitionAvailabilityTest() {
    }

    @Test
    void testInitiallyNotAvailable() throws Exception {
        ResultSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(10);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView subpartitionView = PartitionTestUtils.createView(subpartition, listener);
        Assertions.assertThat((int)listener.numNotifications).isZero();
        subpartitionView.releaseAllResources();
        subpartition.release();
    }

    @Test
    void testUnavailableWhenBuffersExhausted() throws Exception {
        ResultSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView reader = PartitionTestUtils.createView(subpartition, listener);
        List<ResultSubpartition.BufferAndBacklog> data = BoundedBlockingSubpartitionAvailabilityTest.drainAvailableData(reader);
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        Assertions.assertThat((boolean)data.get(data.size() - 1).isDataAvailable()).isFalse();
        reader.releaseAllResources();
        subpartition.release();
    }

    @Test
    void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
        ResultSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        CountingAvailabilityListener listener = new CountingAvailabilityListener();
        ResultSubpartitionView reader = PartitionTestUtils.createView(subpartition, listener);
        List<ResultSubpartition.BufferAndBacklog> data = BoundedBlockingSubpartitionAvailabilityTest.drainAvailableData(reader);
        data.get(0).buffer().recycleBuffer();
        data.get(1).buffer().recycleBuffer();
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog(true).isAvailable()).isTrue();
        Assertions.assertThat((int)listener.numNotifications).isOne();
        reader.releaseAllResources();
        subpartition.release();
    }

    @Test
    void testNotAvailableWhenEmpty() throws Exception {
        ResultSubpartition subpartition = BoundedBlockingSubpartitionAvailabilityTest.createPartitionWithData(100000);
        ResultSubpartitionView reader = subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        BoundedBlockingSubpartitionAvailabilityTest.drainAllData(reader);
        Assertions.assertThat((boolean)reader.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        reader.releaseAllResources();
        subpartition.release();
    }

    private static ResultSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
        BoundedBlockingResultPartition parent = (BoundedBlockingResultPartition)new ResultPartitionBuilder().setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE).setSSLEnabled(true).setFileChannelManager((FileChannelManager)new FileChannelManagerImpl(new String[]{TempDirUtils.newFolder((Path)tmpFolder).getAbsolutePath()}, "data")).setNetworkBufferSize(32768).build();
        ResultSubpartition partition = parent.getAllPartitions()[0];
        BoundedBlockingSubpartitionAvailabilityTest.writeBuffers(partition, numberOfBuffers);
        partition.finish();
        return partition;
    }

    private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            partition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        }
    }

    private static List<ResultSubpartition.BufferAndBacklog> drainAvailableData(ResultSubpartitionView reader) throws Exception {
        ResultSubpartition.BufferAndBacklog bab;
        ArrayList<ResultSubpartition.BufferAndBacklog> list = new ArrayList<ResultSubpartition.BufferAndBacklog>();
        while ((bab = reader.getNextBuffer()) != null) {
            list.add(bab);
        }
        return list;
    }

    private static void drainAllData(ResultSubpartitionView reader) throws Exception {
        ResultSubpartition.BufferAndBacklog bab;
        while ((bab = reader.getNextBuffer()) != null) {
            bab.buffer().recycleBuffer();
        }
    }
}

