/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSubpartitionConsumerInternalOperation;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={TestLoggerExtension.class})
class HsSubpartitionFileReaderImplTest {
    private static final int bufferSize = 4;
    private static final int targetChannel = 0;
    private static final int MAX_BUFFERS_READ_AHEAD = 5;
    private Random random;
    private HsFileDataIndex diskIndex;
    private TestingSubpartitionConsumerInternalOperation subpartitionOperation;
    private FileChannel dataFileChannel;
    private Path indexFilePath;
    private long currentFileOffset;

    HsSubpartitionFileReaderImplTest() {
    }

    @BeforeEach
    void before(@TempDir Path tempPath) throws Exception {
        this.random = new Random();
        Path dataFilePath = Files.createFile(tempPath.resolve(UUID.randomUUID().toString()), new FileAttribute[0]);
        this.indexFilePath = tempPath.resolve(UUID.randomUUID().toString());
        this.dataFileChannel = HsSubpartitionFileReaderImplTest.openFileChannel(dataFilePath);
        this.diskIndex = this.createDataIndex(1, this.indexFilePath);
        this.subpartitionOperation = new TestingSubpartitionConsumerInternalOperation();
        this.currentFileOffset = 0L;
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly((AutoCloseable)this.dataFileChannel);
    }

    @Test
    void testReadBuffer(@TempDir Path tmpPath) throws Exception {
        this.diskIndex = this.createDataIndex(2, tmpPath.resolve(".index"));
        TestingSubpartitionConsumerInternalOperation viewNotifier1 = new TestingSubpartitionConsumerInternalOperation();
        TestingSubpartitionConsumerInternalOperation viewNotifier2 = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl fileReader1 = this.createSubpartitionFileReader(0, viewNotifier1);
        HsSubpartitionFileReaderImpl fileReader2 = this.createSubpartitionFileReader(1, viewNotifier2);
        this.writeDataToFile(0, 0, 10, 2);
        this.writeDataToFile(1, 0, 20, 2);
        this.writeDataToFile(0, 2, 15, 1);
        this.writeDataToFile(1, 2, 25, 1);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(6);
        fileReader1.prepareForScheduling();
        fileReader1.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(memorySegments).hasSize(4);
        HsSubpartitionFileReaderImplTest.checkData(fileReader1, 10, 11);
        fileReader2.prepareForScheduling();
        fileReader2.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(memorySegments).hasSize(2);
        HsSubpartitionFileReaderImplTest.checkData(fileReader2, 20, 21);
        fileReader1.prepareForScheduling();
        fileReader1.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(memorySegments).hasSize(1);
        HsSubpartitionFileReaderImplTest.checkData(fileReader1, 15);
        fileReader2.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(memorySegments).isEmpty();
        HsSubpartitionFileReaderImplTest.checkData(fileReader2, 25);
    }

    @ParameterizedTest
    @ValueSource(strings={"LZ4", "LZO", "ZSTD"})
    void testReadBufferCompressed(String compressionFactoryName, @TempDir Path tmpPath) throws Exception {
        BufferCompressor bufferCompressor = new BufferCompressor(4, compressionFactoryName);
        BufferDecompressor bufferDecompressor = new BufferDecompressor(4, compressionFactoryName);
        this.diskIndex = this.createDataIndex(1, tmpPath.resolve(".index"));
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl fileReader1 = this.createSubpartitionFileReader(0, viewNotifier);
        this.writeDataToFile(0, 0, 1, 3, bufferCompressor);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        fileReader1.prepareForScheduling();
        fileReader1.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        HsSubpartitionFileReaderImplTest.checkData(fileReader1, bufferDecompressor, 1, 2, 3);
    }

    @Test
    void testReadEmptyRegion() throws Exception {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        Deque loadedBuffers = subpartitionFileReader.getLoadedBuffers();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(2);
        subpartitionFileReader.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(memorySegments).hasSize(2);
        Assertions.assertThat((Collection)loadedBuffers).isEmpty();
    }

    @Test
    void testReadBufferSkip() throws Exception {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        Deque loadedBuffers = subpartitionFileReader.getLoadedBuffers();
        this.writeDataToFile(0, 0, 6);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        Assertions.assertThat((int)this.subpartitionOperation.getConsumingOffset(true)).isEqualTo(1);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> segments = HsSubpartitionFileReaderImplTest.createsMemorySegments(1);
        subpartitionFileReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(segments).isEmpty();
        Assertions.assertThat((Collection)loadedBuffers).hasSize(1);
        HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError)loadedBuffers.poll();
        Assertions.assertThat((Object)bufferIndexOrError).isNotNull();
        Assertions.assertThat((Optional)bufferIndexOrError.getBuffer()).isPresent();
        Assertions.assertThat((Optional)bufferIndexOrError.getThrowable()).isNotPresent();
        Assertions.assertThat((int)bufferIndexOrError.getIndex()).isEqualTo(2);
        this.subpartitionOperation.advanceConsumptionProgress();
        this.subpartitionOperation.advanceConsumptionProgress();
        subpartitionFileReader.prepareForScheduling();
        segments = HsSubpartitionFileReaderImplTest.createsMemorySegments(1);
        subpartitionFileReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(segments).isEmpty();
        Assertions.assertThat((Collection)loadedBuffers).hasSize(1);
        bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError)loadedBuffers.poll();
        Assertions.assertThat((Object)bufferIndexOrError).isNotNull();
        Assertions.assertThat((Optional)bufferIndexOrError.getBuffer()).isPresent();
        Assertions.assertThat((Optional)bufferIndexOrError.getThrowable()).isNotPresent();
        Assertions.assertThat((int)bufferIndexOrError.getIndex()).isEqualTo(4);
    }

    @Test
    void testReadBufferNotBeyondRegionBoundary() throws Exception {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        Deque loadedBuffers = subpartitionFileReader.getLoadedBuffers();
        this.writeDataToFile(0, 0, 0, 2);
        this.writeDataToFile(0, 2, 2, 2);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(4);
        subpartitionFileReader.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((Collection)loadedBuffers).hasSize(2);
        HsSubpartitionFileReaderImplTest.checkData(subpartitionFileReader, 0, 1);
        Assertions.assertThat(memorySegments).hasSize(2);
    }

    @Test
    void testReadBufferNotExceedThreshold() throws Exception {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        Deque loadedBuffers = subpartitionFileReader.getLoadedBuffers();
        this.writeDataToFile(0, 0, 6);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(6);
        subpartitionFileReader.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat((Collection)loadedBuffers).hasSize(5);
        Assertions.assertThat(memorySegments).hasSize(1);
    }

    @Test
    void testReadBufferNotifyDataAvailable() throws Exception {
        final OneShotLatch notifyLatch = new OneShotLatch();
        this.subpartitionOperation.setNotifyDataAvailableRunnable(() -> ((OneShotLatch)notifyLatch).trigger());
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        final BlockingDeque loadedBuffers = (BlockingDeque)subpartitionFileReader.getLoadedBuffers();
        int numBuffers = 5;
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(5);
        this.writeDataToFile(0, 0, 5);
        subpartitionFileReader.prepareForScheduling();
        CheckedThread checkedThread = new CheckedThread(){

            public void go() throws Exception {
                int numConsumedBuffer = 0;
                while (numConsumedBuffer < 5) {
                    HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError)loadedBuffers.poll();
                    if (bufferIndexOrError != null) {
                        Assertions.assertThat((Optional)bufferIndexOrError.getBuffer()).isPresent();
                        ++numConsumedBuffer;
                        continue;
                    }
                    notifyLatch.await();
                    notifyLatch.reset();
                }
            }
        };
        checkedThread.start();
        subpartitionFileReader.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE);
        checkedThread.sync();
        Assertions.assertThat((Collection)loadedBuffers).isEmpty();
    }

    @Test
    void testReadWillReturnBufferAfterError() throws Exception {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        this.writeDataToFile(0, 0, 2);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(2);
        this.dataFileChannel.close();
        Assertions.assertThatThrownBy(() -> subpartitionFileReader.readBuffers(memorySegments, FreeingBufferRecycler.INSTANCE)).isInstanceOf(IOException.class);
        Assertions.assertThat(memorySegments).hasSize(2);
    }

    @Test
    void testReadBufferAfterFail() {
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        subpartitionFileReader.fail((Throwable)new RuntimeException("expected exception."));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartitionFileReader.readBuffers(HsSubpartitionFileReaderImplTest.createsMemorySegments(2), FreeingBufferRecycler.INSTANCE)).isInstanceOf(IOException.class)).hasMessageContaining("subpartition reader has already failed.");
    }

    @Test
    void testFail() throws Exception {
        AtomicInteger numOfNotify = new AtomicInteger(0);
        this.subpartitionOperation.setNotifyDataAvailableRunnable(numOfNotify::incrementAndGet);
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader();
        Deque loadedBuffers = subpartitionFileReader.getLoadedBuffers();
        this.writeDataToFile(0, 0, 2);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(2);
        AtomicInteger numReleased = new AtomicInteger(0);
        subpartitionFileReader.readBuffers(memorySegments, buffer -> numReleased.incrementAndGet());
        Assertions.assertThat(memorySegments).isEmpty();
        Assertions.assertThat((Collection)loadedBuffers).hasSize(2);
        Assertions.assertThat((AtomicInteger)numOfNotify).hasValue(1);
        subpartitionFileReader.fail((Throwable)new RuntimeException("expected exception."));
        Assertions.assertThat((AtomicInteger)numReleased).hasValue(2);
        HsSubpartitionFileReaderImpl.BufferIndexOrError error = (HsSubpartitionFileReaderImpl.BufferIndexOrError)loadedBuffers.poll();
        Assertions.assertThat((Collection)loadedBuffers).isEmpty();
        Assertions.assertThat((Object)error).isNotNull();
        Assertions.assertThat((Optional)error.getThrowable()).hasValueSatisfying(throwable -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)throwable).isInstanceOf(RuntimeException.class)).hasMessage("expected exception."));
        Assertions.assertThat((AtomicInteger)numOfNotify).hasValue(2);
    }

    @Test
    void testCompareTo(@TempDir Path tempPath) throws Exception {
        this.diskIndex = this.createDataIndex(2, tempPath.resolve(".index"));
        TestingSubpartitionConsumerInternalOperation viewNotifier1 = new TestingSubpartitionConsumerInternalOperation();
        TestingSubpartitionConsumerInternalOperation viewNotifier2 = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl fileReader1 = this.createSubpartitionFileReader(0, viewNotifier1);
        HsSubpartitionFileReaderImpl fileReader2 = this.createSubpartitionFileReader(1, viewNotifier2);
        Assertions.assertThat((Comparable)fileReader1).isEqualByComparingTo((Comparable)fileReader2);
        this.writeDataToFile(0, 0, 2);
        this.writeDataToFile(1, 0, 1);
        this.writeDataToFile(0, 2, 1);
        Assertions.assertThat((Comparable)fileReader1).isLessThan((Comparable)fileReader2);
        viewNotifier1.advanceConsumptionProgress();
        fileReader1.prepareForScheduling();
        Assertions.assertThat((Comparable)fileReader1).isLessThan((Comparable)fileReader2);
        viewNotifier1.advanceConsumptionProgress();
        fileReader1.prepareForScheduling();
        Assertions.assertThat((Comparable)fileReader1).isGreaterThan((Comparable)fileReader2);
    }

    @Test
    void testRecycleBuffersForConsumeBuffer() throws Throwable {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        this.writeDataToFile(0, 0, 0, 4);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(4);
        subpartitionFileReader.prepareForScheduling();
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        ArrayList buffers = new ArrayList();
        subpartitionFileReader.consumeBuffer(2, buffers);
        Assertions.assertThat(buffers).hasSize(2);
        ((ObjectAssert)Assertions.assertThat(buffers).element(0)).satisfies(new ThrowingConsumer[]{buffer -> HsSubpartitionFileReaderImplTest.assertBufferContentEqualTo(buffer, 0)});
        ((ObjectAssert)Assertions.assertThat(buffers).element(1)).satisfies(new ThrowingConsumer[]{buffer -> HsSubpartitionFileReaderImplTest.assertBufferContentEqualTo(buffer, 1)});
    }

    @Test
    void testRecycleBuffersForPeekNextToConsumeDataType() throws Throwable {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        this.writeDataToFile(0, 0, 0, 4);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(4);
        subpartitionFileReader.prepareForScheduling();
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        ArrayList buffers = new ArrayList();
        subpartitionFileReader.peekNextToConsumeDataType(2, buffers);
        Assertions.assertThat(buffers).hasSize(2);
        ((ObjectAssert)Assertions.assertThat(buffers).element(0)).satisfies(new ThrowingConsumer[]{buffer -> HsSubpartitionFileReaderImplTest.assertBufferContentEqualTo(buffer, 0)});
        ((ObjectAssert)Assertions.assertThat(buffers).element(1)).satisfies(new ThrowingConsumer[]{buffer -> HsSubpartitionFileReaderImplTest.assertBufferContentEqualTo(buffer, 1)});
    }

    @Test
    void testConsumeBuffer() throws Throwable {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        Assertions.assertThat((Optional)subpartitionFileReader.consumeBuffer(0, Collections.emptyList())).isNotPresent();
        this.writeDataToFile(0, 0, 0, 3);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        subpartitionFileReader.prepareForScheduling();
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat((Optional)subpartitionFileReader.consumeBuffer(0, new ArrayList())).hasValueSatisfying(bufferAndBacklog -> {
            Assertions.assertThat((Comparable)bufferAndBacklog.getNextDataType()).isEqualTo((Object)Buffer.DataType.DATA_BUFFER);
            Assertions.assertThat((int)bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
            Assertions.assertThat((int)bufferAndBacklog.buffer().getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(0);
        });
        Assertions.assertThat((Optional)subpartitionFileReader.consumeBuffer(0, Collections.emptyList())).isNotPresent();
        Assertions.assertThat((Optional)subpartitionFileReader.consumeBuffer(2, new ArrayList())).hasValueSatisfying(bufferAndBacklog -> {
            Assertions.assertThat((Comparable)bufferAndBacklog.getNextDataType()).isEqualTo((Object)Buffer.DataType.NONE);
            Assertions.assertThat((int)bufferAndBacklog.getSequenceNumber()).isEqualTo(2);
            Assertions.assertThat((int)bufferAndBacklog.buffer().getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(2);
        });
        Assertions.assertThat((Collection)subpartitionFileReader.getLoadedBuffers()).isEmpty();
    }

    @Test
    void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        subpartitionFileReader.fail((Throwable)new RuntimeException("expected exception."));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartitionFileReader.peekNextToConsumeDataType(0, Collections.emptyList())).isInstanceOf(RuntimeException.class)).hasMessageContaining("expected exception.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartitionFileReader.consumeBuffer(0, Collections.emptyList())).isInstanceOf(RuntimeException.class)).hasMessageContaining("expected exception.");
    }

    @Test
    void testPeekNextToConsumeDataType() throws Throwable {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        Assertions.assertThat((Comparable)subpartitionFileReader.peekNextToConsumeDataType(0, new ArrayList())).isEqualTo((Object)Buffer.DataType.NONE);
        this.writeDataToFile(0, 0, 3);
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        subpartitionFileReader.prepareForScheduling();
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat((Comparable)subpartitionFileReader.peekNextToConsumeDataType(0, Collections.emptyList())).isEqualTo((Object)Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((Comparable)subpartitionFileReader.peekNextToConsumeDataType(2, new ArrayList())).isEqualTo((Object)Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat((Comparable)subpartitionFileReader.peekNextToConsumeDataType(1, Collections.emptyList())).isEqualTo((Object)Buffer.DataType.NONE);
    }

    @Test
    void testSubpartitionReaderRegisterMultipleTimes() throws Exception {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        viewNotifier.advanceConsumptionProgress();
        this.writeDataToFile(0, 0, 1, 3);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat(memorySegments).hasSize(1);
        HsSubpartitionFileReaderImplTest.checkData(subpartitionFileReader, 2, 3);
        viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        subpartitionFileReader = this.createSubpartitionFileReader(0, viewNotifier);
        subpartitionFileReader.prepareForScheduling();
        memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        subpartitionFileReader.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat(memorySegments).isEmpty();
        HsSubpartitionFileReaderImplTest.checkData(subpartitionFileReader, 1, 2, 3);
    }

    @Test
    void testMultipleFileReaderOfSingleSubpartition() throws Exception {
        TestingSubpartitionConsumerInternalOperation viewNotifier1 = new TestingSubpartitionConsumerInternalOperation();
        TestingSubpartitionConsumerInternalOperation viewNotifier2 = new TestingSubpartitionConsumerInternalOperation();
        HsConsumerId consumer0 = HsConsumerId.newId(null);
        HsSubpartitionFileReaderImpl consumer1 = this.createSubpartitionFileReader(0, consumer0, viewNotifier1);
        HsSubpartitionFileReaderImpl consumer2 = this.createSubpartitionFileReader(0, HsConsumerId.newId((HsConsumerId)consumer0), viewNotifier2);
        Assertions.assertThat((Comparable)consumer1).isNotEqualTo((Object)consumer2);
        this.writeDataToFile(0, 0, 1, 3);
        consumer1.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        consumer1.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat(memorySegments).isEmpty();
        HsSubpartitionFileReaderImplTest.checkData(consumer1, 1, 2, 3);
        consumer2.prepareForScheduling();
        memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(3);
        consumer2.readBuffers(memorySegments, ignore -> {});
        Assertions.assertThat(memorySegments).isEmpty();
        HsSubpartitionFileReaderImplTest.checkData(consumer2, 1, 2, 3);
    }

    @Test
    void testReadBuffersAfterReleased() throws Throwable {
        TestingSubpartitionConsumerInternalOperation viewNotifier = new TestingSubpartitionConsumerInternalOperation();
        CompletableFuture isReleased = new CompletableFuture();
        HsSubpartitionFileReaderImpl subpartitionFileReader = this.createSubpartitionFileReader(0, HsConsumerId.DEFAULT, viewNotifier, ignore -> isReleased.complete(null));
        this.writeDataToFile(0, 0, 4);
        subpartitionFileReader.prepareForScheduling();
        Queue<MemorySegment> memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(2);
        ArrayDeque recycledBuffers = new ArrayDeque();
        subpartitionFileReader.readBuffers(memorySegments, recycledBuffers::add);
        Assertions.assertThat(memorySegments).isEmpty();
        subpartitionFileReader.releaseDataView();
        Assertions.assertThat((Collection)subpartitionFileReader.getLoadedBuffers()).isEmpty();
        Assertions.assertThat(recycledBuffers).hasSize(2);
        memorySegments = HsSubpartitionFileReaderImplTest.createsMemorySegments(2);
        subpartitionFileReader.readBuffers(memorySegments, recycledBuffers::add);
        Assertions.assertThat(memorySegments).hasSize(2);
        Assertions.assertThat(recycledBuffers).hasSize(2);
        Assertions.assertThat(isReleased).isCompleted();
    }

    private static void checkData(HsSubpartitionFileReaderImpl fileReader, BufferDecompressor bufferDecompressor, int ... expectedData) {
        Assertions.assertThat((Collection)fileReader.getLoadedBuffers()).hasSameSizeAs((Object)expectedData);
        for (int data : expectedData) {
            HsSubpartitionFileReaderImpl.BufferIndexOrError bufferIndexOrError = (HsSubpartitionFileReaderImpl.BufferIndexOrError)fileReader.getLoadedBuffers().poll();
            Assertions.assertThat((Object)bufferIndexOrError).isNotNull();
            Assertions.assertThat((Optional)bufferIndexOrError.getBuffer()).isPresent();
            Buffer buffer = (Buffer)bufferIndexOrError.getBuffer().get();
            buffer = buffer.isCompressed() && bufferDecompressor != null ? bufferDecompressor.decompressToIntermediateBuffer(buffer) : buffer;
            Assertions.assertThat((int)buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(data);
        }
    }

    private static void checkData(HsSubpartitionFileReaderImpl fileReader, int ... expectedData) {
        HsSubpartitionFileReaderImplTest.checkData(fileReader, null, expectedData);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader() {
        return this.createSubpartitionFileReader(0, this.subpartitionOperation);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int targetChannel, HsSubpartitionConsumerInternalOperations operations) {
        return this.createSubpartitionFileReader(targetChannel, HsConsumerId.DEFAULT, operations);
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int targetChannel, HsConsumerId consumerId, HsSubpartitionConsumerInternalOperations operations) {
        return this.createSubpartitionFileReader(targetChannel, consumerId, operations, ignore -> {});
    }

    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(int targetChannel, HsConsumerId consumerId, HsSubpartitionConsumerInternalOperations operations, Consumer<HsSubpartitionFileReader> fileReaderReleaser) {
        return new HsSubpartitionFileReaderImpl(targetChannel, consumerId, this.dataFileChannel, operations, this.diskIndex, 5, fileReaderReleaser, BufferReaderWriterUtil.allocatedHeaderBuffer());
    }

    private HsFileDataIndexImpl createDataIndex(int numSubpartitions, Path indexFilePath) {
        return new HsFileDataIndexImpl(numSubpartitions, indexFilePath, 256, Long.MAX_VALUE);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE);
    }

    private static Queue<MemorySegment> createsMemorySegments(int numSegments) {
        ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>();
        for (int i = 0; i < numSegments; ++i) {
            segments.add(MemorySegmentFactory.allocateUnpooledSegment((int)4));
        }
        return segments;
    }

    private void writeDataToFile(int subpartitionId, int firstBufferIndex, int firstBufferData, int numBuffers, BufferCompressor bufferCompressor) throws Exception {
        ArrayList<HsFileDataIndex.SpilledBuffer> spilledBuffers = new ArrayList<HsFileDataIndex.SpilledBuffer>(numBuffers);
        ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * numBuffers];
        int totalBytes = 0;
        for (int i = 0; i < numBuffers; ++i) {
            Buffer.DataType dataType = i == numBuffers - 1 ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)4);
            segment.putInt(0, firstBufferData + i);
            NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, dataType, 4);
            if (bufferCompressor != null && buffer.isBuffer()) {
                buffer = bufferCompressor.compressToOriginalBuffer((Buffer)buffer);
            }
            HsSubpartitionFileReaderImplTest.setBufferWithHeader((Buffer)buffer, bufferWithHeaders, 2 * i);
            spilledBuffers.add(new HsFileDataIndex.SpilledBuffer(subpartitionId, firstBufferIndex + i, this.currentFileOffset + (long)totalBytes));
            totalBytes += buffer.getSize() + 8;
        }
        BufferReaderWriterUtil.writeBuffers((FileChannel)this.dataFileChannel, (long)totalBytes, (ByteBuffer[])bufferWithHeaders);
        this.currentFileOffset += (long)totalBytes;
        this.diskIndex.addBuffers(spilledBuffers);
        spilledBuffers.forEach(spilledBuffer -> this.diskIndex.markBufferReleased(subpartitionId, spilledBuffer.bufferIndex));
    }

    private void writeDataToFile(int subpartitionId, int firstBufferIndex, int firstBufferData, int numBuffers) throws Exception {
        this.writeDataToFile(subpartitionId, firstBufferIndex, firstBufferData, numBuffers, null);
    }

    private void writeDataToFile(int subpartitionId, int firstBufferIndex, int numBuffers) throws Exception {
        this.writeDataToFile(subpartitionId, firstBufferIndex, this.random.nextInt(), numBuffers);
    }

    private static void setBufferWithHeader(Buffer buffer, ByteBuffer[] bufferWithHeaders, int index) {
        ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer();
        BufferReaderWriterUtil.setByteChannelBufferHeader((Buffer)buffer, (ByteBuffer)header);
        bufferWithHeaders[index] = header;
        bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
    }

    private static void assertBufferContentEqualTo(Buffer buffer, int expectedValue) {
        Assertions.assertThat((int)buffer.getNioBufferReadable().order(ByteOrder.nativeOrder()).getInt()).isEqualTo(expectedValue);
    }
}

