/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayloadManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageResultSubpartitionView;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TieredStorageResultSubpartitionViewTest {
    private static final int TIER_NUMBER = 2;
    private CompletableFuture<Void> availabilityListener;
    private List<NettyPayloadManager> nettyPayloadManagers;
    private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers;
    private TieredStorageResultSubpartitionView tieredStorageResultSubpartitionView;

    @BeforeEach
    void before() {
        this.availabilityListener = new CompletableFuture();
        this.nettyPayloadManagers = TieredStorageResultSubpartitionViewTest.createNettyPayloadManagers();
        this.connectionBrokenConsumers = Arrays.asList(new CompletableFuture(), new CompletableFuture());
        this.tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView(TieredStorageResultSubpartitionViewTest.createBufferAvailabilityListener(this.availabilityListener), this.nettyPayloadManagers, TieredStorageResultSubpartitionViewTest.createNettyConnectionIds(), TieredStorageResultSubpartitionViewTest.createNettyServiceProducers(this.connectionBrokenConsumers));
    }

    @Test
    void testGetNextBuffer() throws IOException {
        TieredStorageResultSubpartitionViewTest.checkBufferAndBacklog(this.tieredStorageResultSubpartitionView.getNextBuffer(), 0);
        this.tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1);
        Assertions.assertThat(this.availabilityListener).isDone();
        TieredStorageResultSubpartitionViewTest.checkBufferAndBacklog(this.tieredStorageResultSubpartitionView.getNextBuffer(), 0);
        Assertions.assertThat((Object)this.tieredStorageResultSubpartitionView.getNextBuffer()).isNull();
    }

    @Test
    void testGetNextBufferFailed() {
        IOException expectedError = new IOException();
        this.nettyPayloadManagers = TieredStorageResultSubpartitionViewTest.createNettyPayloadQueuesWithError(expectedError);
        this.tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView(TieredStorageResultSubpartitionViewTest.createBufferAvailabilityListener(this.availabilityListener), this.nettyPayloadManagers, TieredStorageResultSubpartitionViewTest.createNettyConnectionIds(), TieredStorageResultSubpartitionViewTest.createNettyServiceProducers(this.connectionBrokenConsumers));
        Assertions.assertThatThrownBy(() -> ((TieredStorageResultSubpartitionView)this.tieredStorageResultSubpartitionView).getNextBuffer()).hasCause((Throwable)expectedError);
        Assertions.assertThat(this.connectionBrokenConsumers.get(0)).isDone();
    }

    @Test
    void testGetAvailabilityAndBacklog() {
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 = this.tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(0);
        Assertions.assertThat((int)availabilityAndBacklog1.getBacklog()).isEqualTo(1);
        Assertions.assertThat((boolean)availabilityAndBacklog1.isAvailable()).isEqualTo(false);
        ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = this.tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(2);
        Assertions.assertThat((int)availabilityAndBacklog2.getBacklog()).isEqualTo(1);
        Assertions.assertThat((boolean)availabilityAndBacklog2.isAvailable()).isEqualTo(true);
    }

    @Test
    void testNotifyRequiredSegmentId() {
        this.tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1);
        Assertions.assertThat(this.availabilityListener).isDone();
    }

    @Test
    void testReleaseAllResources() throws IOException {
        this.tieredStorageResultSubpartitionView.releaseAllResources();
        Assertions.assertThat((int)this.nettyPayloadManagers.get(0).getBacklog()).isZero();
        Assertions.assertThat((int)this.nettyPayloadManagers.get(1).getBacklog()).isZero();
        Assertions.assertThat(this.connectionBrokenConsumers.get(0)).isDone();
        Assertions.assertThat(this.connectionBrokenConsumers.get(1)).isDone();
        Assertions.assertThat((boolean)this.tieredStorageResultSubpartitionView.isReleased()).isTrue();
    }

    @Test
    void testGetNumberOfQueuedBuffers() {
        Assertions.assertThat((int)this.tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(1);
        Assertions.assertThat((int)this.tieredStorageResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
    }

    private static void checkBufferAndBacklog(ResultSubpartition.BufferAndBacklog bufferAndBacklog, int backlog) {
        Assertions.assertThat((Object)bufferAndBacklog).isNotNull();
        Assertions.assertThat((Object)bufferAndBacklog.buffer()).isNotNull();
        Assertions.assertThat((int)bufferAndBacklog.buffersInBacklog()).isEqualTo(backlog);
    }

    private static BufferAvailabilityListener createBufferAvailabilityListener(CompletableFuture<Void> notifier) {
        return () -> notifier.complete(null);
    }

    private static List<NettyPayloadManager> createNettyPayloadManagers() {
        ArrayList<NettyPayloadManager> nettyPayloadManagers = new ArrayList<NettyPayloadManager>();
        for (int index = 0; index < 2; ++index) {
            NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
            nettyPayloadManager.add(NettyPayload.newSegment((int)index));
            nettyPayloadManager.add(NettyPayload.newBuffer((Buffer)BufferBuilderTestUtils.buildSomeBuffer(0), (int)0, (int)index));
            nettyPayloadManager.add(NettyPayload.newBuffer((Buffer)new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)0), FreeingBufferRecycler.INSTANCE, Buffer.DataType.END_OF_SEGMENT), (int)1, (int)index));
            nettyPayloadManagers.add(nettyPayloadManager);
        }
        return nettyPayloadManagers;
    }

    private static List<NettyPayloadManager> createNettyPayloadQueuesWithError(Throwable error) {
        ArrayList<NettyPayloadManager> nettyPayloadManagers = new ArrayList<NettyPayloadManager>();
        for (int index = 0; index < 2; ++index) {
            NettyPayloadManager queue = new NettyPayloadManager();
            queue.add(NettyPayload.newSegment((int)index));
            queue.add(NettyPayload.newError((Throwable)error));
            nettyPayloadManagers.add(queue);
        }
        return nettyPayloadManagers;
    }

    private static List<NettyConnectionId> createNettyConnectionIds() {
        ArrayList<NettyConnectionId> nettyConnectionIds = new ArrayList<NettyConnectionId>();
        for (int index = 0; index < 2; ++index) {
            nettyConnectionIds.add(NettyConnectionId.newId());
        }
        return nettyConnectionIds;
    }

    private static List<NettyServiceProducer> createNettyServiceProducers(List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers) {
        ArrayList<NettyServiceProducer> nettyServiceProducers = new ArrayList<NettyServiceProducer>();
        int index = 0;
        while (index < connectionBrokenConsumers.size()) {
            int indexNumber = index++;
            nettyServiceProducers.add(new TestingNettyServiceProducer.Builder().setConnectionBrokenConsumer(connectionId -> ((CompletableFuture)connectionBrokenConsumers.get(indexNumber)).complete(connectionId)).build());
        }
        return nettyServiceProducers;
    }
}

