/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumer;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingHsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSpillingStrategy;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

class HsSubpartitionViewTest {
    HsSubpartitionViewTest() {
    }

    @Test
    void testGetNextBufferFromDisk() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        ResultSubpartition.BufferAndBacklog bufferAndBacklog = HsSubpartitionViewTest.createBufferAndBacklog(1, Buffer.DataType.DATA_BUFFER, 0);
        CompletableFuture consumeBufferFromMemoryFuture = new CompletableFuture();
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(bufferAndBacklog))).build();
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)ignore -> {
            consumeBufferFromMemoryFuture.complete(null);
            return Optional.empty();
        })).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        ResultSubpartition.BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
        Assertions.assertThat(consumeBufferFromMemoryFuture).isNotCompleted();
        Assertions.assertThat((Object)nextBuffer).isSameAs((Object)bufferAndBacklog);
    }

    @Test
    @Timeout(value=60L)
    void testDeadLock(@TempDir Path dataFilePath) throws Exception {
        int bufferSize = 16;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 16);
        BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10);
        final HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        final CompletableFuture acquireWriteLock = new CompletableFuture();
        CheckedThread consumerThread = new CheckedThread(){

            public void go() throws Exception {
                acquireWriteLock.get();
                subpartitionView.getNextBuffer();
            }
        };
        TestingSpillingStrategy spillingStrategy = TestingSpillingStrategy.builder().setOnMemoryUsageChangedFunction((ignore1, ignore2) -> Optional.empty()).setDecideActionWithGlobalInfoFunction(spillingInfoProvider -> {
            acquireWriteLock.complete(null);
            try {
                consumerThread.trySync(10L);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            spillingInfoProvider.getNextBufferIndexToConsume(HsConsumerId.DEFAULT);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build();
        HsMemoryDataManager memoryDataManager = new HsMemoryDataManager(1, 16, bufferPool, (HsSpillingStrategy)spillingStrategy, (HsFileDataIndex)new HsFileDataIndexImpl(1, dataFilePath.resolve(".index"), 256, Long.MAX_VALUE), dataFilePath.resolve(".data"), null, 0L);
        memoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        HsDataView hsDataView = memoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, (HsSubpartitionConsumerInternalOperations)subpartitionView);
        subpartitionView.setMemoryDataView(hsDataView);
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.NO_OP);
        consumerThread.start();
        memoryDataManager.append(ByteBuffer.allocate(16), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test
    void testGetNextBufferFromDiskNextDataTypeIsNone() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        ResultSubpartition.BufferAndBacklog bufferAndBacklog = HsSubpartitionViewTest.createBufferAndBacklog(0, Buffer.DataType.NONE, 0);
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(bufferAndBacklog))).build();
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setPeekNextToConsumeDataTypeFunction(bufferToConsume -> {
            Assertions.assertThat((Integer)bufferToConsume).isEqualTo(1);
            return Buffer.DataType.EVENT_BUFFER;
        }).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        ResultSubpartition.BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
        Assertions.assertThat((Object)nextBuffer).isNotNull();
        Assertions.assertThat((Object)nextBuffer.buffer()).isSameAs((Object)bufferAndBacklog.buffer());
        Assertions.assertThat((int)nextBuffer.buffersInBacklog()).isEqualTo(bufferAndBacklog.buffersInBacklog());
        Assertions.assertThat((int)nextBuffer.getSequenceNumber()).isEqualTo(bufferAndBacklog.getSequenceNumber());
        Assertions.assertThat((Comparable)nextBuffer.getNextDataType()).isEqualTo((Object)Buffer.DataType.EVENT_BUFFER);
    }

    @Test
    void testGetNextBufferFromMemory() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        ResultSubpartition.BufferAndBacklog bufferAndBacklog = HsSubpartitionViewTest.createBufferAndBacklog(1, Buffer.DataType.DATA_BUFFER, 0);
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(bufferAndBacklog))).build();
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.empty())).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        ResultSubpartition.BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer();
        Assertions.assertThat((Object)nextBuffer).isSameAs((Object)bufferAndBacklog);
    }

    @Test
    void testGetNextBufferThrowException() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)nextToConsume -> {
            throw new RuntimeException("expected exception.");
        })).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP);
        subpartitionView.getNextBuffer();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)subpartitionView.getFailureCause()).isInstanceOf(RuntimeException.class)).hasMessageContaining("expected exception.");
        Assertions.assertThat((boolean)subpartitionView.isReleased()).isTrue();
    }

    @Test
    void testGetNextBufferZeroBacklog() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        boolean diskBacklog = false;
        int memoryBacklog = 10;
        ResultSubpartition.BufferAndBacklog targetBufferAndBacklog = HsSubpartitionViewTest.createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, 0);
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(targetBufferAndBacklog))).build();
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setGetBacklogSupplier(() -> 10).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        Assertions.assertThat((Object)subpartitionView.getNextBuffer()).satisfies(new ThrowingConsumer[]{bufferAndBacklog -> {
            Assertions.assertThat((int)bufferAndBacklog.buffersInBacklog()).isEqualTo(10);
            Assertions.assertThat((Object)bufferAndBacklog.buffer()).isEqualTo((Object)targetBufferAndBacklog.buffer());
            Assertions.assertThat((Comparable)bufferAndBacklog.getNextDataType()).isEqualTo((Object)targetBufferAndBacklog.getNextDataType());
            Assertions.assertThat((int)bufferAndBacklog.getSequenceNumber()).isEqualTo(targetBufferAndBacklog.getSequenceNumber());
        }});
    }

    @Test
    void testNotifyDataAvailableNeedNotify() {
        CompletableFuture notifyAvailableFuture = new CompletableFuture();
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView(view -> notifyAvailableFuture.complete(null));
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(HsSubpartitionViewTest.createBufferAndBacklog(0, Buffer.DataType.NONE, 0)))).build();
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.NO_OP);
        subpartitionView.getNextBuffer();
        subpartitionView.notifyDataAvailable();
        Assertions.assertThat(notifyAvailableFuture).isCompleted();
    }

    @Test
    void testNotifyDataAvailableNotNeedNotify() {
        CompletableFuture notifyAvailableFuture = new CompletableFuture();
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView(view -> notifyAvailableFuture.complete(null));
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)bufferToConsume -> Optional.of(HsSubpartitionViewTest.createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, 0)))).build();
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.NO_OP);
        subpartitionView.getNextBuffer();
        subpartitionView.notifyDataAvailable();
        Assertions.assertThat(notifyAvailableFuture).isNotCompleted();
    }

    @Test
    void testGetZeroBacklogNeedNotify() {
        CompletableFuture notifyAvailableFuture = new CompletableFuture();
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView(view -> notifyAvailableFuture.complete(null));
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP);
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.builder().setGetBacklogSupplier(() -> 0).build());
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = subpartitionView.getAvailabilityAndBacklog(false);
        Assertions.assertThat((int)availabilityAndBacklog.getBacklog()).isZero();
        Assertions.assertThat(notifyAvailableFuture).isNotCompleted();
        subpartitionView.notifyDataAvailable();
        Assertions.assertThat(notifyAvailableFuture).isCompleted();
    }

    @Test
    void testGetAvailabilityAndBacklogPositiveCredit() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP);
        int backlog = 2;
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.builder().setGetBacklogSupplier(() -> 2).build());
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = subpartitionView.getAvailabilityAndBacklog(true);
        Assertions.assertThat((int)availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat((boolean)availabilityAndBacklog.isAvailable()).isTrue();
    }

    @Test
    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() {
        int backlog = 2;
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)nextToConsume -> Optional.of(HsSubpartitionViewTest.createBufferAndBacklog(2, Buffer.DataType.DATA_BUFFER, 0)))).build());
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.builder().setGetBacklogSupplier(() -> 2).build());
        subpartitionView.getNextBuffer();
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = subpartitionView.getAvailabilityAndBacklog(false);
        Assertions.assertThat((int)availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat((boolean)availabilityAndBacklog.isAvailable()).isFalse();
    }

    @Test
    void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() {
        int backlog = 2;
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)nextToConsume -> Optional.of(HsSubpartitionViewTest.createBufferAndBacklog(2, Buffer.DataType.EVENT_BUFFER, 0)))).build());
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.builder().setGetBacklogSupplier(() -> 2).build());
        subpartitionView.getNextBuffer();
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog = subpartitionView.getAvailabilityAndBacklog(false);
        Assertions.assertThat((int)availabilityAndBacklog.getBacklog()).isEqualTo(2);
        Assertions.assertThat((boolean)availabilityAndBacklog.isAvailable()).isTrue();
    }

    @Test
    void testRelease() throws Exception {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        CompletableFuture releaseDiskViewFuture = new CompletableFuture();
        CompletableFuture releaseMemoryViewFuture = new CompletableFuture();
        TestingHsDataView diskDataView = TestingHsDataView.builder().setReleaseDataViewRunnable(() -> releaseDiskViewFuture.complete(null)).build();
        TestingHsDataView memoryDataView = TestingHsDataView.builder().setReleaseDataViewRunnable(() -> releaseMemoryViewFuture.complete(null)).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)memoryDataView);
        subpartitionView.releaseAllResources();
        Assertions.assertThat((boolean)subpartitionView.isReleased()).isTrue();
        Assertions.assertThat(releaseDiskViewFuture).isCompleted();
        Assertions.assertThat(releaseMemoryViewFuture).isCompleted();
    }

    @Test
    void testGetConsumingOffset() {
        AtomicInteger nextBufferIndex = new AtomicInteger(0);
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        TestingHsDataView diskDataView = TestingHsDataView.builder().setConsumeBufferFunction((FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable>)((FunctionWithException)toConsumeBuffer -> Optional.of(HsSubpartitionViewTest.createBufferAndBacklog(0, Buffer.DataType.DATA_BUFFER, nextBufferIndex.getAndIncrement())))).build();
        subpartitionView.setDiskDataView((HsDataView)diskDataView);
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP);
        Assertions.assertThat((int)subpartitionView.getConsumingOffset(true)).isEqualTo(-1);
        subpartitionView.getNextBuffer();
        Assertions.assertThat((int)subpartitionView.getConsumingOffset(true)).isEqualTo(0);
        subpartitionView.getNextBuffer();
        Assertions.assertThat((int)subpartitionView.getConsumingOffset(true)).isEqualTo(1);
    }

    @Test
    void testSetDataViewRepeatedly() {
        HsSubpartitionConsumer subpartitionView = HsSubpartitionViewTest.createSubpartitionView();
        subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartitionView.setMemoryDataView((HsDataView)TestingHsDataView.NO_OP)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("repeatedly set memory data view is not allowed.");
        subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.NO_OP);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> subpartitionView.setDiskDataView((HsDataView)TestingHsDataView.NO_OP)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("repeatedly set disk data view is not allowed.");
    }

    private static HsSubpartitionConsumer createSubpartitionView() {
        return new HsSubpartitionConsumer((BufferAvailabilityListener)new NoOpBufferAvailablityListener());
    }

    private static HsSubpartitionConsumer createSubpartitionView(BufferAvailabilityListener bufferAvailabilityListener) {
        return new HsSubpartitionConsumer(bufferAvailabilityListener);
    }

    private static ResultSubpartition.BufferAndBacklog createBufferAndBacklog(int buffersInBacklog, Buffer.DataType nextDataType, int sequenceNumber) {
        int bufferSize = 8;
        Buffer buffer = HybridShuffleTestUtils.createBuffer(8, true);
        return new ResultSubpartition.BufferAndBacklog(buffer, buffersInBacklog, nextDataType, sequenceNumber);
    }
}

