/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blocklist;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistContext;
import org.apache.flink.runtime.blocklist.BlocklistListener;
import org.apache.flink.runtime.blocklist.BlocklistTracker;
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.blocklist.DefaultBlocklistTracker;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultBlocklistHandlerTest {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultBlocklistHandlerTest.class);
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    DefaultBlocklistHandlerTest() {
    }

    @Test
    void testAddNewBlockedNodes() throws Exception {
        BlockedNode node1 = new BlockedNode("node1", "cause", 1L);
        BlockedNode node2 = new BlockedNode("node2", "cause", 1L);
        BlockedNode node2Update = new BlockedNode("node2", "cause", 2L);
        ArrayList contextReceivedNodes = new ArrayList();
        TestBlocklistContext context = TestBlocklistContext.newBuilder().setBlockResourcesConsumer(blockedNodes -> contextReceivedNodes.add(new ArrayList(blockedNodes))).build();
        TestBlocklistListener listener = new TestBlocklistListener();
        try (DefaultBlocklistHandler handler = this.createDefaultBlocklistHandler(context);){
            handler.registerBlocklistListener((BlocklistListener)listener);
            Assertions.assertThat(listener.listenerReceivedNodes).isEmpty();
            Assertions.assertThat(contextReceivedNodes).isEmpty();
            handler.addNewBlockedNodes(Arrays.asList(node1, node2));
            Assertions.assertThat(listener.listenerReceivedNodes).hasSize(1);
            Assertions.assertThat(listener.listenerReceivedNodes.get(0)).containsExactlyInAnyOrder((Object[])new BlockedNode[]{node1, node2});
            Assertions.assertThat(contextReceivedNodes).hasSize(1);
            Assertions.assertThat((List)((List)contextReceivedNodes.get(0))).containsExactlyInAnyOrder((Object[])new BlockedNode[]{node1, node2});
            Assertions.assertThat(contextReceivedNodes).hasSize(1);
            Assertions.assertThat(listener.listenerReceivedNodes).hasSize(1);
            handler.addNewBlockedNodes(Collections.singleton(node2Update));
            Assertions.assertThat(listener.listenerReceivedNodes).hasSize(2);
            Assertions.assertThat(listener.listenerReceivedNodes.get(1)).containsExactly((Object[])new BlockedNode[]{node2Update});
            Assertions.assertThat(contextReceivedNodes).hasSize(1);
            TestBlocklistListener listener2 = new TestBlocklistListener();
            handler.registerBlocklistListener((BlocklistListener)listener2);
            Assertions.assertThat(listener2.listenerReceivedNodes).hasSize(1);
            Assertions.assertThat(listener2.listenerReceivedNodes.get(0)).containsExactlyInAnyOrder((Object[])new BlockedNode[]{node1, node2Update});
        }
    }

    @Test
    void testRemoveTimeoutNodes() throws Exception {
        ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        CompletableFuture unblockResourcesFuture = new CompletableFuture();
        TestBlocklistContext context = TestBlocklistContext.newBuilder().setUnblockResourcesConsumer(unblockResourcesFuture::complete).build();
        try (DefaultBlocklistHandler handler = this.createDefaultBlocklistHandler(context, mainThreadExecutor);){
            ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                BlockedNode blockedNode = new BlockedNode("node", "cause", System.currentTimeMillis() + 1000L);
                handler.addNewBlockedNodes(Collections.singleton(blockedNode));
                Assertions.assertThat((Collection)handler.getAllBlockedNodeIds()).hasSize(1);
                return blockedNode;
            }, (Executor)mainThreadExecutor).thenAcceptBoth((CompletionStage)unblockResourcesFuture, (blockedNode, unblockResources) -> {
                Assertions.assertThat((Collection)handler.getAllBlockedNodeIds()).isEmpty();
                Assertions.assertThat((Collection)unblockResources).containsExactly((Object[])new BlockedNode[]{blockedNode});
            })).get();
        }
    }

    @Test
    void testIsBlockedTaskManager() throws Exception {
        ResourceID resourceID1 = ResourceID.generate();
        ResourceID resourceID2 = ResourceID.generate();
        ResourceID resourceID3 = ResourceID.generate();
        HashMap<ResourceID, String> taskManagerToNode = new HashMap<ResourceID, String>();
        taskManagerToNode.put(resourceID1, "node1");
        taskManagerToNode.put(resourceID2, "node1");
        taskManagerToNode.put(resourceID3, "node2");
        try (DefaultBlocklistHandler handler = this.createDefaultBlocklistHandler(taskManagerToNode);){
            handler.addNewBlockedNodes(Collections.singleton(new BlockedNode("node1", "cause", Long.MAX_VALUE)));
            Assertions.assertThat((boolean)handler.isBlockedTaskManager(resourceID1)).isTrue();
            Assertions.assertThat((boolean)handler.isBlockedTaskManager(resourceID2)).isTrue();
            Assertions.assertThat((boolean)handler.isBlockedTaskManager(resourceID3)).isFalse();
        }
    }

    private DefaultBlocklistHandler createDefaultBlocklistHandler(BlocklistContext blocklistContext) {
        return new DefaultBlocklistHandler((BlocklistTracker)new DefaultBlocklistTracker(), blocklistContext, resourceID -> "node", Duration.ofMillis(100L), ComponentMainThreadExecutorServiceAdapter.forMainThread(), LOG);
    }

    private DefaultBlocklistHandler createDefaultBlocklistHandler(Map<ResourceID, String> taskManagerToNode) {
        return new DefaultBlocklistHandler((BlocklistTracker)new DefaultBlocklistTracker(), (BlocklistContext)TestBlocklistContext.newBuilder().build(), taskManagerToNode::get, Duration.ofMillis(100L), ComponentMainThreadExecutorServiceAdapter.forMainThread(), LOG);
    }

    private DefaultBlocklistHandler createDefaultBlocklistHandler(BlocklistContext blocklistContext, ComponentMainThreadExecutor mainThreadExecutor) {
        return new DefaultBlocklistHandler((BlocklistTracker)new DefaultBlocklistTracker(), blocklistContext, resourceID -> "node", Duration.ofMillis(100L), mainThreadExecutor, LOG);
    }

    private static class TestBlocklistContext
    implements BlocklistContext {
        private final Consumer<Collection<BlockedNode>> blockResourcesConsumer;
        private final Consumer<Collection<BlockedNode>> unblockResourcesConsumer;

        private TestBlocklistContext(Consumer<Collection<BlockedNode>> blockResourcesConsumer, Consumer<Collection<BlockedNode>> unblockResourcesConsumer) {
            this.blockResourcesConsumer = (Consumer)Preconditions.checkNotNull(blockResourcesConsumer);
            this.unblockResourcesConsumer = (Consumer)Preconditions.checkNotNull(unblockResourcesConsumer);
        }

        public void blockResources(Collection<BlockedNode> blockedNodes) {
            this.blockResourcesConsumer.accept(blockedNodes);
        }

        public void unblockResources(Collection<BlockedNode> unblockedNodes) {
            this.unblockResourcesConsumer.accept(unblockedNodes);
        }

        static Builder newBuilder() {
            return new Builder();
        }

        private static class Builder {
            private Consumer<Collection<BlockedNode>> blockResourcesConsumer = ignored -> {};
            private Consumer<Collection<BlockedNode>> unblockResourcesConsumer = ignored -> {};

            private Builder() {
            }

            public Builder setBlockResourcesConsumer(Consumer<Collection<BlockedNode>> blockResourcesConsumer) {
                this.blockResourcesConsumer = blockResourcesConsumer;
                return this;
            }

            public Builder setUnblockResourcesConsumer(Consumer<Collection<BlockedNode>> unblockResourcesConsumer) {
                this.unblockResourcesConsumer = unblockResourcesConsumer;
                return this;
            }

            public TestBlocklistContext build() {
                return new TestBlocklistContext(this.blockResourcesConsumer, this.unblockResourcesConsumer);
            }
        }
    }

    private static class TestBlocklistListener
    implements BlocklistListener {
        private final List<List<BlockedNode>> listenerReceivedNodes = new ArrayList<List<BlockedNode>>();

        private TestBlocklistListener() {
        }

        public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
            this.listenerReceivedNodes.add(new ArrayList<BlockedNode>(newNodes));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }
}

