/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferWithIdentity;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation;
import org.apache.flink.runtime.io.network.partition.hybrid.HsOutputMetrics;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionMemoryDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingMemoryDataManagerOperation;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class HsSubpartitionMemoryDataManagerTest {
    private static final int SUBPARTITION_ID = 0;
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final int RECORD_SIZE = 8;
    private int bufferSize = 8;

    HsSubpartitionMemoryDataManagerTest() {
    }

    @Test
    void testAppendDataRequestBuffer() throws Exception {
        CompletableFuture requestBufferFuture = new CompletableFuture();
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> {
            requestBufferFuture.complete(null);
            return HybridShuffleTestUtils.createBufferBuilder(this.bufferSize);
        })).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(requestBufferFuture).isCompleted();
    }

    @Test
    void testAppendEventNotRequestBuffer() throws Exception {
        CompletableFuture requestBufferFuture = new CompletableFuture();
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> {
            requestBufferFuture.complete(null);
            return null;
        })).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(requestBufferFuture).isNotDone();
    }

    @Test
    void testAppendEventFinishCurrentBuffer() throws Exception {
        this.bufferSize = 24;
        AtomicInteger finishedBuffers = new AtomicInteger(0);
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(this.bufferSize))).setOnBufferFinishedRunnable(finishedBuffers::incrementAndGet).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(1L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((AtomicInteger)finishedBuffers).hasValue(0);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(2L), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat((AtomicInteger)finishedBuffers).hasValue(2);
    }

    @Test
    void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((Comparable)subpartitionMemoryDataManager.peekNextToConsumeDataType(1)).isEqualTo((Object)Buffer.DataType.NONE);
    }

    @Test
    void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(1L), Buffer.DataType.DATA_BUFFER);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(2L), Buffer.DataType.EVENT_BUFFER);
        List<BufferIndexAndChannel> toRelease = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1);
        subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease);
        Assertions.assertThat((Comparable)subpartitionMemoryDataManager.peekNextToConsumeDataType(2)).isEqualTo((Object)Buffer.DataType.EVENT_BUFFER);
    }

    @Test
    void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat((Optional)subpartitionMemoryDataManager.consumeBuffer(1)).isNotPresent();
    }

    @Test
    void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(1L), Buffer.DataType.DATA_BUFFER);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(2L), Buffer.DataType.EVENT_BUFFER);
        List<BufferIndexAndChannel> toRelease = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1);
        subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease);
        Assertions.assertThat((Optional)subpartitionMemoryDataManager.consumeBuffer(2)).isPresent();
    }

    @Test
    void testConsumeBufferReturnSlice() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(0L), Buffer.DataType.DATA_BUFFER);
        Optional bufferOpt = subpartitionMemoryDataManager.consumeBuffer(0);
        Assertions.assertThat((Optional)bufferOpt).hasValueSatisfying(bufferAndBacklog -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)bufferAndBacklog.buffer()).isInstanceOf(ReadOnlySlicedNetworkBuffer.class);
        });
    }

    @ParameterizedTest
    @ValueSource(strings={"LZ4", "LZO", "ZSTD", "NULL"})
    void testConsumeBuffer(String compressionFactoryName) throws Exception {
        int numDataBuffers = 10;
        int numRecordsPerBuffer = 10;
        this.bufferSize = 80;
        BufferCompressor bufferCompressor = compressionFactoryName.equals("NULL") ? null : new BufferCompressor(this.bufferSize, compressionFactoryName);
        BufferDecompressor bufferDecompressor = compressionFactoryName.equals("NULL") ? null : new BufferDecompressor(this.bufferSize, compressionFactoryName);
        ArrayList consumedBufferIndexAndChannel = new ArrayList();
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(this.bufferSize))).setOnBufferConsumedConsumer(consumedBufferIndexAndChannel::add).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation, bufferCompressor);
        ArrayList<Tuple2<Long, Buffer.DataType>> expectedRecords = new ArrayList<Tuple2<Long, Buffer.DataType>>();
        long recordValue = 0L;
        for (int i = 0; i < 10; ++i) {
            for (int j = 0; j < 10; ++j) {
                subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(recordValue), Buffer.DataType.DATA_BUFFER);
                expectedRecords.add(Tuple2.of((Object)recordValue++, (Object)Buffer.DataType.DATA_BUFFER));
            }
        }
        subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(recordValue), Buffer.DataType.EVENT_BUFFER);
        expectedRecords.add(Tuple2.of((Object)recordValue, (Object)Buffer.DataType.EVENT_BUFFER));
        ArrayList<Optional<ResultSubpartition.BufferAndBacklog>> bufferAndBacklogOpts = new ArrayList<Optional<ResultSubpartition.BufferAndBacklog>>();
        for (int i = 0; i < 11; ++i) {
            bufferAndBacklogOpts.add(subpartitionMemoryDataManager.consumeBuffer(i));
        }
        HsSubpartitionMemoryDataManagerTest.checkConsumedBufferAndNextDataType(10, bufferDecompressor, expectedRecords, bufferAndBacklogOpts);
        List<BufferIndexAndChannel> expectedBufferIndexAndChannel = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, IntStream.range(0, 11).toArray());
        Assertions.assertThat(consumedBufferIndexAndChannel).zipSatisfy(expectedBufferIndexAndChannel, (consumed, expected) -> {
            Assertions.assertThat((int)consumed.getChannel()).isEqualTo(expected.getChannel());
            Assertions.assertThat((int)consumed.getBufferIndex()).isEqualTo(expected.getBufferIndex());
        });
    }

    @Test
    void testGetBuffersSatisfyStatus() throws Exception {
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        int numBuffers = 4;
        for (int i = 0; i < 4; ++i) {
            subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(i), Buffer.DataType.DATA_BUFFER);
        }
        List<BufferIndexAndChannel> toStartSpilling = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 1, 2);
        CompletableFuture spilledDoneFuture = new CompletableFuture();
        subpartitionMemoryDataManager.spillSubpartitionBuffers(toStartSpilling, spilledDoneFuture);
        subpartitionMemoryDataManager.consumeBuffer(0);
        subpartitionMemoryDataManager.consumeBuffer(1);
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.ALL), Arrays.asList(0, 1, 2, 3));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.CONSUMED), Arrays.asList(0, 1));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED), Arrays.asList(2, 3));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL), Arrays.asList(1, 2));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL), Arrays.asList(0, 3));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED), Collections.singletonList(2));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.CONSUMED), Collections.singletonList(1));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.CONSUMED), Collections.singletonList(0));
        HsSubpartitionMemoryDataManagerTest.checkBufferIndex(subpartitionMemoryDataManager.getBuffersSatisfyStatus(HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED), Collections.singletonList(3));
    }

    @Test
    void testSpillSubpartitionBuffers() throws Exception {
        CompletableFuture<Object> spilledDoneFuture = new CompletableFuture<Object>();
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(8))).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        int numBuffers = 3;
        for (int i = 0; i < 3; ++i) {
            subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(i), Buffer.DataType.DATA_BUFFER);
        }
        List<BufferIndexAndChannel> toStartSpilling = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2);
        List buffers = subpartitionMemoryDataManager.spillSubpartitionBuffers(toStartSpilling, spilledDoneFuture);
        Assertions.assertThat(toStartSpilling).zipSatisfy((Iterable)buffers, (expected, spilled) -> {
            Assertions.assertThat((int)expected.getBufferIndex()).isEqualTo(spilled.getBufferIndex());
            Assertions.assertThat((int)expected.getChannel()).isEqualTo(spilled.getChannelIndex());
        });
        List<Integer> expectedValues = Arrays.asList(0, 1, 2);
        HsSubpartitionMemoryDataManagerTest.checkBuffersRefCountAndValue(buffers, Arrays.asList(2, 2, 2), expectedValues);
        spilledDoneFuture.complete(null);
        HsSubpartitionMemoryDataManagerTest.checkBuffersRefCountAndValue(buffers, Arrays.asList(1, 1, 1), expectedValues);
    }

    @Test
    void testReleaseAndMarkReadableSubpartitionBuffers() throws Exception {
        int targetChannel = 0;
        ArrayList readableBufferIndex = new ArrayList();
        ArrayList<MemorySegment> recycledBuffers = new ArrayList<MemorySegment>();
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment((int)this.bufferSize), recycledBuffers::add))).setMarkBufferReadableConsumer((channel, bufferIndex) -> {
            Assertions.assertThat((Integer)channel).isEqualTo(targetChannel);
            readableBufferIndex.add(bufferIndex);
        }).build();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        int numBuffers = 3;
        for (int i = 0; i < 3; ++i) {
            subpartitionMemoryDataManager.append(HsSubpartitionMemoryDataManagerTest.createRecord(i), Buffer.DataType.DATA_BUFFER);
        }
        List<BufferIndexAndChannel> toRelease = HybridShuffleTestUtils.createBufferIndexAndChannelsList(targetChannel, 0, 1, 2);
        CompletableFuture<Object> spilledFuture = new CompletableFuture<Object>();
        subpartitionMemoryDataManager.spillSubpartitionBuffers(toRelease.subList(2, 3), spilledFuture);
        subpartitionMemoryDataManager.releaseSubpartitionBuffers(toRelease);
        Assertions.assertThat(readableBufferIndex).isEmpty();
        HsSubpartitionMemoryDataManagerTest.checkMemorySegmentValue(recycledBuffers, Arrays.asList(0, 1));
        spilledFuture.complete(null);
        Assertions.assertThat(readableBufferIndex).containsExactly((Object[])new Integer[]{2});
        HsSubpartitionMemoryDataManagerTest.checkMemorySegmentValue(recycledBuffers, Arrays.asList(0, 1, 2));
    }

    @Test
    void testMetricsUpdate() throws Exception {
        int recordSize = this.bufferSize / 2;
        TestingMemoryDataManagerOperation memoryDataManagerOperation = TestingMemoryDataManagerOperation.builder().setRequestBufferFromPoolSupplier((SupplierWithException<BufferBuilder, InterruptedException>)((SupplierWithException)() -> HybridShuffleTestUtils.createBufferBuilder(this.bufferSize))).build();
        HsOutputMetrics metrics = HybridShuffleTestUtils.createTestingOutputMetrics();
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = this.createSubpartitionMemoryDataManager(memoryDataManagerOperation);
        subpartitionMemoryDataManager.setOutputMetrics(metrics);
        subpartitionMemoryDataManager.append(ByteBuffer.allocate(recordSize), Buffer.DataType.DATA_BUFFER);
        ByteBuffer eventBuffer = EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        int eventSize = eventBuffer.remaining();
        subpartitionMemoryDataManager.append(EventSerializer.toSerializedEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE), Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat((long)metrics.getNumBuffersOut().getCount()).isEqualTo(2L);
        Assertions.assertThat((long)metrics.getNumBytesOut().getCount()).isEqualTo((long)(recordSize + eventSize));
    }

    private static void checkBufferIndex(Deque<BufferIndexAndChannel> bufferWithIdentities, List<Integer> expectedIndexes) {
        List bufferIndexes = bufferWithIdentities.stream().map(BufferIndexAndChannel::getBufferIndex).collect(Collectors.toList());
        Assertions.assertThat(bufferIndexes).isEqualTo(expectedIndexes);
    }

    private static void checkMemorySegmentValue(List<MemorySegment> memorySegments, List<Integer> expectedValues) {
        for (int i = 0; i < memorySegments.size(); ++i) {
            Assertions.assertThat((int)memorySegments.get(i).getInt(0)).isEqualTo((Object)expectedValues.get(i));
        }
    }

    private static void checkConsumedBufferAndNextDataType(int numRecordsPerBuffer, BufferDecompressor bufferDecompressor, List<Tuple2<Long, Buffer.DataType>> expectedRecords, List<Optional<ResultSubpartition.BufferAndBacklog>> bufferAndBacklogOpt) {
        int i = 0;
        while (i < bufferAndBacklogOpt.size()) {
            int bufferIndex = i++;
            Assertions.assertThat(bufferAndBacklogOpt.get(bufferIndex)).hasValueSatisfying(bufferAndBacklog -> {
                Buffer buffer = bufferAndBacklog.buffer();
                if (buffer.isCompressed()) {
                    Assertions.assertThat((Object)bufferDecompressor).isNotNull();
                    buffer = bufferDecompressor.decompressToIntermediateBuffer(buffer);
                }
                ByteBuffer byteBuffer = buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
                int recordIndex = bufferIndex * numRecordsPerBuffer;
                while (byteBuffer.hasRemaining()) {
                    long value = byteBuffer.getLong();
                    Buffer.DataType dataType = buffer.getDataType();
                    Assertions.assertThat((long)value).isEqualTo(((Tuple2)expectedRecords.get((int)recordIndex)).f0);
                    Assertions.assertThat((Comparable)dataType).isEqualTo(((Tuple2)expectedRecords.get((int)recordIndex)).f1);
                    ++recordIndex;
                }
                if (bufferIndex != bufferAndBacklogOpt.size() - 1) {
                    Assertions.assertThat((Comparable)bufferAndBacklog.getNextDataType()).isEqualTo(((Tuple2)expectedRecords.get((int)recordIndex)).f1);
                } else {
                    Assertions.assertThat((Comparable)bufferAndBacklog.getNextDataType()).isEqualTo((Object)Buffer.DataType.NONE);
                }
                buffer.recycleBuffer();
            });
        }
    }

    private static void checkBuffersRefCountAndValue(List<BufferWithIdentity> bufferWithIdentities, List<Integer> expectedRefCounts, List<Integer> expectedValues) {
        for (int i = 0; i < bufferWithIdentities.size(); ++i) {
            BufferWithIdentity bufferWithIdentity = bufferWithIdentities.get(i);
            Buffer buffer = bufferWithIdentity.getBuffer();
            Assertions.assertThat((int)buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN).getInt()).isEqualTo((Object)expectedValues.get(i));
            Assertions.assertThat((int)buffer.refCnt()).isEqualTo((Object)expectedRefCounts.get(i));
        }
    }

    private HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager(HsMemoryDataManagerOperation memoryDataManagerOperation) {
        return this.createSubpartitionMemoryDataManager(memoryDataManagerOperation, null);
    }

    private HsSubpartitionMemoryDataManager createSubpartitionMemoryDataManager(HsMemoryDataManagerOperation memoryDataManagerOperation, @Nullable BufferCompressor bufferCompressor) {
        HsSubpartitionMemoryDataManager subpartitionMemoryDataManager = new HsSubpartitionMemoryDataManager(0, this.bufferSize, (Lock)lock.readLock(), bufferCompressor, memoryDataManagerOperation);
        subpartitionMemoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        return subpartitionMemoryDataManager;
    }

    private static ByteBuffer createRecord(long value) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
        byteBuffer.putLong(value);
        byteBuffer.flip();
        return byteBuffer;
    }
}

