/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingResultPartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.runtime.io.network.partition.BoundedDataTestBase;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.FileChannelBoundedData;
import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;

class FileChannelBoundedDataTest
extends BoundedDataTestBase {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;

    FileChannelBoundedDataTest() {
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Override
    protected boolean isRegionBased() {
        return false;
    }

    @Override
    protected BoundedData createBoundedData(Path tempFilePath) throws IOException {
        return FileChannelBoundedData.create((Path)tempFilePath, (int)0x100000);
    }

    @Override
    protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) {
        throw new UnsupportedOperationException();
    }

    @TestTemplate
    void testReadNextBuffer() throws Exception {
        int numberOfBuffers = 3;
        try (BoundedData data = this.createBoundedData();){
            FileChannelBoundedDataTest.writeBuffers(data, 3);
            BoundedData.Reader reader = data.createReader();
            Buffer buffer1 = reader.nextBuffer();
            Buffer buffer2 = reader.nextBuffer();
            Assertions.assertThat((Object)buffer1).isNotNull();
            Assertions.assertThat((Object)buffer2).isNotNull();
            Assertions.assertThat((Object)reader.nextBuffer()).isNull();
            buffer1.recycleBuffer();
            buffer2.recycleBuffer();
        }
    }

    @TestTemplate
    void testRecycleBufferForNotifyingSubpartitionView() throws Exception {
        int numberOfBuffers = 2;
        try (BoundedData data = this.createBoundedData();){
            FileChannelBoundedDataTest.writeBuffers(data, 2);
            VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView();
            BoundedData.Reader reader = data.createReader((ResultSubpartitionView)subpartitionView);
            Buffer buffer1 = reader.nextBuffer();
            Buffer buffer2 = reader.nextBuffer();
            Assertions.assertThat((Object)buffer1).isNotNull();
            Assertions.assertThat((Object)buffer2).isNotNull();
            Assertions.assertThat((boolean)subpartitionView.isAvailable).isFalse();
            buffer1.recycleBuffer();
            Assertions.assertThat((boolean)subpartitionView.isAvailable).isTrue();
            subpartitionView.resetAvailable();
            Assertions.assertThat((boolean)subpartitionView.isAvailable).isFalse();
            Assertions.assertThat((Object)reader.nextBuffer()).isNull();
            buffer2.recycleBuffer();
            Assertions.assertThat((boolean)subpartitionView.isAvailable).isFalse();
        }
    }

    @TestTemplate
    void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception {
        ResultSubpartition subpartition = FileChannelBoundedDataTest.createFileBoundedBlockingSubpartition();
        int numberOfBuffers = 2;
        FileChannelBoundedDataTest.writeBuffers(subpartition, 2);
        VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener();
        ResultSubpartitionView subpartitionView = PartitionTestUtils.createView(subpartition, listener);
        Assertions.assertThat((boolean)listener.isAvailable).isFalse();
        ResultSubpartition.BufferAndBacklog buffer1 = subpartitionView.getNextBuffer();
        ResultSubpartition.BufferAndBacklog buffer2 = subpartitionView.getNextBuffer();
        Assertions.assertThat((Object)buffer1).isNotNull();
        Assertions.assertThat((Object)buffer2).isNotNull();
        Assertions.assertThat((boolean)subpartitionView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        buffer1.buffer().recycleBuffer();
        Assertions.assertThat((boolean)listener.isAvailable).isTrue();
        buffer2.buffer().recycleBuffer();
        subpartitionView.releaseAllResources();
        subpartition.release();
    }

    private static ResultSubpartition createFileBoundedBlockingSubpartition() {
        BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition)new ResultPartitionBuilder().setNetworkBufferSize(0x100000).setResultPartitionType(ResultPartitionType.BLOCKING).setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE).setFileChannelManager(fileChannelManager).setSSLEnabled(true).build();
        return resultPartition.subpartitions[0];
    }

    private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            data.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(0x100000));
        }
        data.finishWrite();
    }

    private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException {
        for (int i = 0; i < numberOfBuffers; ++i) {
            subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0x100000));
        }
        subpartition.finish();
    }

    private static class VerifyNotificationBufferAvailabilityListener
    implements BufferAvailabilityListener {
        private boolean isAvailable;

        private VerifyNotificationBufferAvailabilityListener() {
        }

        public void notifyDataAvailable(ResultSubpartitionView view) {
            this.isAvailable = true;
        }

        private void resetAvailable() {
            this.isAvailable = false;
        }
    }

    private static class VerifyNotificationResultSubpartitionView
    extends NoOpResultSubpartitionView {
        private boolean isAvailable;

        private VerifyNotificationResultSubpartitionView() {
        }

        public void notifyDataAvailable() {
            this.isAvailable = true;
        }

        private void resetAvailable() {
            this.isAvailable = false;
        }
    }
}

