/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.TestingPartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DiskIOSchedulerTest {
    private static final TieredStoragePartitionId DEFAULT_PARTITION_ID = TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID());
    private static final TieredStorageSubpartitionId DEFAULT_SUBPARTITION_ID = new TieredStorageSubpartitionId(0);
    private static final int BUFFER_POOL_SIZE = 1;
    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5L);
    private static final int DEFAULT_MAX_READ_AHEAD = 5;
    private BatchShuffleReadBufferPool bufferPool;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;
    private CompletableFuture<Integer> segmentIdFuture;
    private CompletableFuture<Void> readerReleaseFuture;
    private DiskIOScheduler diskIOScheduler;
    private List<Map<Integer, Integer>> firstBufferIndexInSegment;

    DiskIOSchedulerTest() {
    }

    @BeforeEach
    void before() {
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.bufferPool = new BatchShuffleReadBufferPool(1L, 1);
        this.bufferPool.initialize();
        this.segmentIdFuture = new CompletableFuture();
        this.readerReleaseFuture = new CompletableFuture();
        this.firstBufferIndexInSegment = this.createFirstBufferIndexInSegment();
        this.diskIOScheduler = new DiskIOScheduler(DEFAULT_PARTITION_ID, this.bufferPool, (ScheduledExecutorService)this.ioExecutor, 1, DEFAULT_BUFFER_REQUEST_TIMEOUT, 5, (subpartitionId, bufferIndex) -> this.firstBufferIndexInSegment.get((int)subpartitionId).get(bufferIndex), (PartitionFileReader)new TestingPartitionFileReader.Builder().setReadBufferSupplier((bufferIndex, segmentId) -> {
            this.segmentIdFuture.complete((Integer)segmentId);
            return new PartitionFileReader.ReadBufferResult(Collections.singletonList(BufferBuilderTestUtils.buildSomeBuffer(0)), true, null);
        }).setReleaseNotifier(() -> this.readerReleaseFuture.complete(null)).setPrioritySupplier(subpartitionId -> (long)subpartitionId).build());
    }

    @AfterEach
    void after() {
        this.bufferPool.destroy();
    }

    @Test
    void testConnectionEstablished() {
        CompletableFuture bufferWriteNotifier = new CompletableFuture();
        TestingNettyConnectionWriter nettyConnectionWriter = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() == -1) {
                bufferWriteNotifier.complete(nettyPayload);
            }
            return null;
        }).build();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, (NettyConnectionWriter)nettyConnectionWriter);
        Assertions.assertThat(this.segmentIdFuture).isNotDone();
        Assertions.assertThat(bufferWriteNotifier).isNotDone();
        this.ioExecutor.trigger();
        Assertions.assertThat(this.segmentIdFuture).isCompletedWithValue((Object)0);
        Assertions.assertThat(bufferWriteNotifier).isDone();
    }

    @Test
    void testSequenceReading() {
        CompletableFuture bufferWriteNotifier1 = new CompletableFuture();
        CompletableFuture bufferWriteNotifier2 = new CompletableFuture();
        TestingNettyConnectionWriter nettyConnectionWriter1 = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() == -1) {
                bufferWriteNotifier1.complete(nettyPayload);
            }
            return null;
        }).build();
        TestingNettyConnectionWriter nettyConnectionWriter2 = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() == -1) {
                bufferWriteNotifier2.complete(nettyPayload);
            }
            return null;
        }).build();
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(1), (NettyConnectionWriter)nettyConnectionWriter2);
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(0), (NettyConnectionWriter)nettyConnectionWriter1);
        Assertions.assertThat(bufferWriteNotifier1).isNotDone();
        Assertions.assertThat(bufferWriteNotifier2).isNotDone();
        this.ioExecutor.trigger();
        Assertions.assertThat(bufferWriteNotifier1).isDone();
        Assertions.assertThat(bufferWriteNotifier2).isNotDone();
    }

    @Test
    void testConnectionBroken() {
        CompletableFuture bufferWriteNotifier = new CompletableFuture();
        NettyConnectionId nettyConnectionId = NettyConnectionId.newId();
        TestingNettyConnectionWriter nettyConnectionWriter = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            if (nettyPayload.getSegmentId() == -1) {
                bufferWriteNotifier.complete(nettyPayload);
            }
            return null;
        }).setNettyConnectionIdSupplier(() -> nettyConnectionId).build();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, (NettyConnectionWriter)nettyConnectionWriter);
        this.diskIOScheduler.connectionBroken(nettyConnectionId);
        this.ioExecutor.trigger();
        Assertions.assertThat(this.segmentIdFuture).isNotDone();
        Assertions.assertThat(bufferWriteNotifier).isNotDone();
    }

    @Test
    void testRelease() {
        CompletableFuture bufferWriteNotifier = new CompletableFuture();
        NettyConnectionId nettyConnectionId = NettyConnectionId.newId();
        TestingNettyConnectionWriter nettyConnectionWriter = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            bufferWriteNotifier.complete(nettyPayload);
            return null;
        }).setNettyConnectionIdSupplier(() -> nettyConnectionId).build();
        this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, (NettyConnectionWriter)nettyConnectionWriter);
        this.diskIOScheduler.release();
        Assertions.assertThat(this.readerReleaseFuture).isDone();
        Assertions.assertThatThrownBy(() -> this.diskIOScheduler.connectionEstablished(DEFAULT_SUBPARTITION_ID, (NettyConnectionWriter)nettyConnectionWriter)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testDeadLock() {
        final CompletableFuture waitFuture1 = new CompletableFuture();
        final CompletableFuture waitFuture2 = new CompletableFuture();
        TestingNettyConnectionWriter nettyConnectionWriter = new TestingNettyConnectionWriter.Builder().setWriteBufferFunction(nettyPayload -> {
            try {
                waitFuture2.complete(null);
                waitFuture1.get();
            }
            catch (InterruptedException | ExecutionException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            return null;
        }).build();
        CheckedThread consumerThread = new CheckedThread(){

            public void go() throws Exception {
                waitFuture2.get();
                DiskIOSchedulerTest.this.diskIOScheduler.release();
                waitFuture1.complete(null);
            }
        };
        consumerThread.start();
        this.diskIOScheduler.connectionEstablished(new TieredStorageSubpartitionId(0), (NettyConnectionWriter)nettyConnectionWriter);
        this.ioExecutor.trigger();
        Assertions.assertThat(waitFuture1).isDone();
        Assertions.assertThat(waitFuture2).isDone();
    }

    private List<Map<Integer, Integer>> createFirstBufferIndexInSegment() {
        HashMap<Integer, Integer> firstBufferIndexInSegment0 = new HashMap<Integer, Integer>();
        HashMap<Integer, Integer> firstBufferIndexInSegment1 = new HashMap<Integer, Integer>();
        firstBufferIndexInSegment0.put(0, 0);
        firstBufferIndexInSegment1.put(0, 0);
        ArrayList<Map<Integer, Integer>> list = new ArrayList<Map<Integer, Integer>>();
        list.add(firstBufferIndexInSegment0);
        list.add(firstBufferIndexInSegment1);
        return list;
    }
}

