package org.apache.flink.runtime.resourcemanager.active;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.TestingResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.class */
public class ActiveResourceManagerTest {
    private static final long TESTING_START_WORKER_TIMEOUT_MS = 50;

    @RegisterExtension
    public static AllCallbackWrapper<TestingRpcServiceExtension> rpcServiceExtensionWrapper = new AllCallbackWrapper<>(new TestingRpcServiceExtension());
    private static final long TIMEOUT_SEC = 5;
    private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
    private static final Time TESTING_START_WORKER_INTERVAL = Time.milliseconds(50);
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = WorkerResourceSpec.ZERO;
    private static final TaskExecutorMemoryConfiguration TESTING_CONFIG = new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, Long.valueOf(TIMEOUT_SEC), 6L, 7L, 8L, 21L, 36L);

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest$Context.class */
    private static class Context {
        final Configuration flinkConfig;
        final TestingResourceManagerDriver.Builder driverBuilder;
        final TestingSlotManagerBuilder slotManagerBuilder;
        private ActiveResourceManager<ResourceID> resourceManager;
        private TestingFatalErrorHandler fatalErrorHandler;

        private Context() {
            this.flinkConfig = new Configuration();
            this.driverBuilder = new TestingResourceManagerDriver.Builder();
            this.slotManagerBuilder = new TestingSlotManagerBuilder();
        }

        ActiveResourceManager<ResourceID> getResourceManager() {
            return this.resourceManager;
        }

        TestingFatalErrorHandler getFatalErrorHandler() {
            return this.fatalErrorHandler;
        }

        void runTest(RunnableWithException runnableWithException) throws Exception {
            this.fatalErrorHandler = new TestingFatalErrorHandler();
            this.resourceManager = createAndStartResourceManager(this.flinkConfig, this.driverBuilder.build(), this.slotManagerBuilder.createSlotManager());
            try {
                runnableWithException.run();
            } finally {
                this.resourceManager.close();
            }
        }

        private ActiveResourceManager<ResourceID> createAndStartResourceManager(Configuration configuration, ResourceManagerDriver<ResourceID> resourceManagerDriver, SlotManager slotManager) throws Exception {
            TestingRpcService testingRpcService = ((TestingRpcServiceExtension) ActiveResourceManagerTest.rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService();
            MockResourceManagerRuntimeServices mockResourceManagerRuntimeServices = new MockResourceManagerRuntimeServices(testingRpcService, slotManager);
            ActiveResourceManager<ResourceID> activeResourceManager = new ActiveResourceManager<>(resourceManagerDriver, configuration, testingRpcService, UUID.randomUUID(), ResourceID.generate(), mockResourceManagerRuntimeServices.heartbeatServices, mockResourceManagerRuntimeServices.delegationTokenManager, mockResourceManagerRuntimeServices.slotManager, NoOpResourceManagerPartitionTracker::get, new NoOpBlocklistHandler.Factory(), mockResourceManagerRuntimeServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), this.fatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), ActiveResourceManagerFactory.createStartWorkerFailureRater(configuration), (Duration) configuration.get(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL), (Duration) configuration.get(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT), (Duration) configuration.get(ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT), ForkJoinPool.commonPool());
            activeResourceManager.start();
            activeResourceManager.getStartedFuture().get(ActiveResourceManagerTest.TIMEOUT_TIME.getSize(), ActiveResourceManagerTest.TIMEOUT_TIME.getUnit());
            return activeResourceManager;
        }

        CompletableFuture<Void> runInMainThread(Runnable runnable) {
            return this.resourceManager.runInMainThread(() -> {
                runnable.run();
                return null;
            }, ActiveResourceManagerTest.TIMEOUT_TIME);
        }

        <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
            return this.resourceManager.runInMainThread(callable, ActiveResourceManagerTest.TIMEOUT_TIME);
        }

        CompletableFuture<Acknowledge> registerTaskExecutorAndSendSlotReport(ResourceID resourceID, int i) {
            return registerTaskExecutor(resourceID).thenCompose(registrationResponse -> {
                Assertions.assertThat(registrationResponse).isInstanceOf(RegistrationResponse.Success.class);
                InstanceID instanceID = (InstanceID) this.resourceManager.getInstanceIdByResourceId(resourceID).get();
                HashSet hashSet = new HashSet();
                for (int i2 = 0; i2 < i; i2++) {
                    hashSet.add(new SlotStatus(new SlotID(resourceID, i2), ResourceProfile.ANY));
                }
                return this.resourceManager.getSelfGateway(ResourceManagerGateway.class).sendSlotReport(resourceID, instanceID, new SlotReport(hashSet), ActiveResourceManagerTest.TIMEOUT_TIME);
            });
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID) {
            return registerTaskExecutor(resourceID, new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
        }

        CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
            ((TestingRpcServiceExtension) ActiveResourceManagerTest.rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService().registerGateway(resourceID.toString(), taskExecutorGateway);
            return this.resourceManager.getSelfGateway(ResourceManagerGateway.class).registerTaskExecutor(new TaskExecutorRegistration(resourceID.toString(), resourceID, 1234, 23456, new HardwareDescription(1, 2L, 3L, 4L), ActiveResourceManagerTest.TESTING_CONFIG, ResourceProfile.ZERO, ResourceProfile.ZERO, resourceID.toString()), ActiveResourceManagerTest.TIMEOUT_TIME);
        }
    }

    ActiveResourceManagerTest() {
    }

    @Test
    void testStartNewWorker() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.1
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    completableFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) completableFuture.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(taskExecutorProcessSpec2).isEqualTo(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(generate)).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testLessThanDeclareResource() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.2
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(CompletableFuture.completedFuture(ResourceID.generate()));
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    return (CompletableFuture) arrayList.get(atomicInteger.getAndIncrement());
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger).hasValue(2);
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 3, Collections.emptySet())));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger).hasValue(3);
                });
            }
        };
    }

    @Test
    void testMoreThanDeclaredResource() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.3
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                List asList = Arrays.asList(CompletableFuture.completedFuture(ResourceID.generate()), CompletableFuture.completedFuture(ResourceID.generate()), CompletableFuture.completedFuture(ResourceID.generate()), new CompletableFuture());
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                List asList2 = Arrays.asList(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    return (CompletableFuture) asList.get(atomicInteger.getAndIncrement());
                }).setReleaseResourceConsumer(resourceID -> {
                    ((CompletableFuture) asList2.get(atomicInteger2.getAndIncrement())).complete(resourceID);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        for (int i = 0; i < 4; i++) {
                            getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                        }
                    });
                    ResourceID resourceID2 = (ResourceID) ((CompletableFuture) asList.get(0)).get();
                    ResourceID resourceID3 = (ResourceID) ((CompletableFuture) asList.get(1)).get();
                    ResourceID resourceID4 = (ResourceID) ((CompletableFuture) asList.get(2)).get();
                    CompletableFuture completableFuture = (CompletableFuture) asList.get(3);
                    registerTaskExecutorAndSendSlotReport(resourceID2, 1).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    registerTaskExecutorAndSendSlotReport(resourceID3, 1).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger).hasValue(4);
                    Assertions.assertThat(atomicInteger2).hasValue(0);
                    Set singleton = Collections.singleton(getResourceManager().getInstanceIdByResourceId(resourceID2).get());
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 3, singleton)));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger2).hasValue(1);
                    Assertions.assertThat((CompletableFuture) asList2.get(0)).isCompletedWithValue(resourceID2);
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 2, Collections.emptySet())));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger2).hasValue(1);
                    Assertions.assertThat(completableFuture).isCancelled();
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger2).hasValue(2);
                    Assertions.assertThat((CompletableFuture) asList2.get(1)).isCompletedWithValue(resourceID4);
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 0, Collections.emptySet())));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(atomicInteger2).hasValue(3);
                    Assertions.assertThat((CompletableFuture) asList2.get(2)).isCompletedWithValue(resourceID3);
                });
            }
        };
    }

    @Test
    void testStartNewWorkerFailedRequesting() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.4
            {
                ResourceID generate = ResourceID.generate();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return (CompletableFuture) arrayList.get(andIncrement);
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(taskExecutorProcessSpec2).isEqualTo(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    runInMainThread(() -> {
                        return Boolean.valueOf(((CompletableFuture) arrayList.get(0)).completeExceptionally(new Throwable("testing error")));
                    });
                    Assertions.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).isEqualTo(taskExecutorProcessSpec2);
                    runInMainThread(() -> {
                        return Boolean.valueOf(((CompletableFuture) arrayList.get(1)).complete(generate));
                    });
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(generate)).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testWorkerTerminatedBeforeRegister() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.5
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(ResourceID.generate());
                arrayList.add(ResourceID.generate());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(arrayList.get(andIncrement));
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(taskExecutorProcessSpec2).isEqualTo(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated((ResourceID) arrayList.get(0), "terminate for testing");
                    });
                    Assertions.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).isEqualTo(taskExecutorProcessSpec2);
                    FlinkAssertions.assertThatFuture(registerTaskExecutor((ResourceID) arrayList.get(1))).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testWorkerTerminatedAfterRegister() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.6
            {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(ResourceID.generate());
                arrayList.add(ResourceID.generate());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(arrayList.get(andIncrement));
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(taskExecutorProcessSpec2).isEqualTo(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    FlinkAssertions.assertThatFuture(registerTaskExecutor((ResourceID) arrayList.get(0))).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated((ResourceID) arrayList.get(0), "terminate for testing");
                    });
                    Assertions.assertThat((TaskExecutorProcessSpec) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).isEqualTo(taskExecutorProcessSpec2);
                    FlinkAssertions.assertThatFuture(registerTaskExecutor((ResourceID) arrayList.get(1))).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testWorkerTerminatedNoLongerRequired() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.7
            {
                ResourceID generate = ResourceID.generate();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList.get(andIncrement)).complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    });
                    TaskExecutorProcessSpec taskExecutorProcessSpec2 = (TaskExecutorProcessSpec) ((CompletableFuture) arrayList.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat(taskExecutorProcessSpec2).isEqualTo(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, ActiveResourceManagerTest.WORKER_RESOURCE_SPEC));
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(generate)).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated(generate, "terminate for testing");
                        return null;
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    Assertions.assertThat((CompletableFuture) arrayList.get(1)).isNotCompleted();
                });
            }
        };
    }

    @Test
    void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.8
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                CompletableFuture completableFuture2 = new CompletableFuture();
                TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(exc -> {
                    completableFuture2.complete(null);
                }).createTestingTaskExecutorGateway();
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    completableFuture.complete(taskExecutorProcessSpec);
                    return CompletableFuture.completedFuture(generate);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    }).thenCompose(r7 -> {
                        return registerTaskExecutor(generate, createTestingTaskExecutorGateway);
                    }).thenRun(() -> {
                        runInMainThread(() -> {
                            getResourceManager().onWorkerTerminated(generate, "terminate for testing");
                        });
                    });
                    completableFuture2.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                });
            }
        };
    }

    @Test
    void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.9
            {
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, Double.valueOf(1.0d));
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL, Duration.ofMillis(ActiveResourceManagerTest.TESTING_START_WORKER_INTERVAL.toMilliseconds()));
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ArrayList arrayList = new ArrayList();
                arrayList.add(ResourceID.generate());
                arrayList.add(ResourceID.generate());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(Long.valueOf(System.currentTimeMillis()));
                    return CompletableFuture.completedFuture(arrayList.get(andIncrement));
                });
                runTest(() -> {
                    CompletableFuture<Void> runInMainThread = runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    });
                    long longValue = ((Long) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).longValue();
                    runInMainThread.get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    runInMainThread(() -> {
                        getResourceManager().onWorkerTerminated((ResourceID) arrayList.get(0), "terminate for testing");
                    });
                    Assertions.assertThat(((Long) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).longValue() - longValue).isGreaterThanOrEqualTo(ActiveResourceManagerTest.TESTING_START_WORKER_INTERVAL.toMilliseconds());
                    FlinkAssertions.assertThatFuture(registerTaskExecutor((ResourceID) arrayList.get(1))).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.10
            {
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, Double.valueOf(1.0d));
                this.flinkConfig.set(ResourceManagerOptions.START_WORKER_RETRY_INTERVAL, Duration.ofMillis(ActiveResourceManagerTest.TESTING_START_WORKER_INTERVAL.toMilliseconds()));
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ResourceID generate = ResourceID.generate();
                ArrayList arrayList = new ArrayList();
                arrayList.add(new CompletableFuture());
                arrayList.add(new CompletableFuture());
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(new CompletableFuture());
                arrayList2.add(new CompletableFuture());
                this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    int andIncrement = atomicInteger.getAndIncrement();
                    Assertions.assertThat(andIncrement).isLessThan(2);
                    ((CompletableFuture) arrayList2.get(andIncrement)).complete(Long.valueOf(System.currentTimeMillis()));
                    return (CompletableFuture) arrayList.get(andIncrement);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().declareResourceNeeded(Collections.singleton(new ResourceDeclaration(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC, 1, Collections.emptySet())));
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                    long longValue = ((Long) ((CompletableFuture) arrayList2.get(0)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).longValue();
                    runInMainThread(() -> {
                        return Boolean.valueOf(((CompletableFuture) arrayList.get(0)).completeExceptionally(new Throwable("testing error")));
                    });
                    Assertions.assertThat(((Long) ((CompletableFuture) arrayList2.get(1)).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).longValue() - longValue).isGreaterThanOrEqualTo(ActiveResourceManagerTest.TESTING_START_WORKER_INTERVAL.toMilliseconds());
                    ((CompletableFuture) arrayList.get(1)).complete(generate);
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(generate)).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testRecoverWorkerFromPreviousAttempt() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.11
            {
                ResourceID generate = ResourceID.generate();
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(generate));
                    });
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(generate)).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testRegisterUnknownWorker() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.12
            {
                runTest(() -> {
                    FlinkAssertions.assertThatFuture(registerTaskExecutor(ResourceID.generate())).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isInstanceOf(RegistrationResponse.Rejection.class);
                });
            }
        };
    }

    @Test
    void testOnError() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.13
            {
                Throwable th = new Throwable("Testing fatal error");
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onError(th);
                    });
                    Assertions.assertThat(getFatalErrorHandler().getErrorFuture().get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS)).isSameAs(th);
                });
            }
        };
    }

    @Test
    void testWorkerRegistrationTimeout() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.14
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, Duration.ofMillis(50L));
                TestingResourceManagerDriver.Builder requestResourceFunction = this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    return CompletableFuture.completedFuture(generate);
                });
                completableFuture.getClass();
                requestResourceFunction.setReleaseResourceConsumer((v1) -> {
                    r1.complete(v1);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    });
                    FlinkAssertions.assertThatFuture(completableFuture).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isSameAs(generate);
                });
            }
        };
    }

    @Test
    void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.15
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                CompletableFuture completableFuture2 = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, Duration.ofMillis(50L));
                TestingResourceManagerDriver.Builder requestResourceFunction = this.driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
                    return completableFuture;
                });
                completableFuture2.getClass();
                requestResourceFunction.setReleaseResourceConsumer((v1) -> {
                    r1.complete(v1);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().requestNewWorker(ActiveResourceManagerTest.WORKER_RESOURCE_SPEC);
                    });
                    Thread.sleep(100L);
                    long nanoTime = System.nanoTime();
                    runInMainThread(() -> {
                        return Boolean.valueOf(completableFuture.complete(generate));
                    });
                    RegistrationResponse join = registerTaskExecutor(generate).join();
                    FlinkAssertions.assertThatFuture(completableFuture2).isNotDone();
                    Assumptions.assumeThat((System.nanoTime() - nanoTime) / 1000000).as("The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.", new Object[0]).isLessThan(50L);
                    Assertions.assertThat(join).isInstanceOf(RegistrationResponse.Success.class);
                });
            }
        };
    }

    @Test
    void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.16
            {
                ResourceID generate = ResourceID.generate();
                CompletableFuture completableFuture = new CompletableFuture();
                this.flinkConfig.set(ResourceManagerOptions.TASK_MANAGER_REGISTRATION_TIMEOUT, Duration.ofMillis(50L));
                TestingResourceManagerDriver.Builder builder = this.driverBuilder;
                completableFuture.getClass();
                builder.setReleaseResourceConsumer((v1) -> {
                    r1.complete(v1);
                });
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(generate));
                    });
                    FlinkAssertions.assertThatFuture(completableFuture).succeedsWithin(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS).isSameAs(generate);
                });
            }
        };
    }

    @Test
    void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.17
            {
                ResourceID generate = ResourceID.generate();
                ResourceID generate2 = ResourceID.generate();
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onPreviousAttemptWorkersRecovered(ImmutableSet.of(generate, generate2));
                    });
                    runInMainThread(() -> {
                        getResourceManager().onWorkerRegistered(generate, WorkerResourceSpec.ZERO);
                    });
                    runInMainThread(() -> {
                        getResourceManager().onWorkerRegistered(generate2, WorkerResourceSpec.ZERO);
                    });
                    runInMainThread(() -> {
                        return Assertions.assertThat(getResourceManager().getReadyToServeFuture()).isCompleted();
                    }).get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                });
            }
        };
    }

    @Test
    void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.resourcemanager.active.ActiveResourceManagerTest.18
            {
                ResourceID generate = ResourceID.generate();
                ResourceID generate2 = ResourceID.generate();
                this.flinkConfig.set(ResourceManagerOptions.RESOURCE_MANAGER_PREVIOUS_WORKER_RECOVERY_TIMEOUT, Duration.ofMillis(50L));
                runTest(() -> {
                    runInMainThread(() -> {
                        getResourceManager().onPreviousAttemptWorkersRecovered(ImmutableSet.of(generate, generate2));
                    });
                    runInMainThread(() -> {
                        getResourceManager().onWorkerRegistered(generate, WorkerResourceSpec.ZERO);
                    });
                    getResourceManager().getReadyToServeFuture().get(ActiveResourceManagerTest.TIMEOUT_SEC, TimeUnit.SECONDS);
                });
            }
        };
    }
}
