/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.utils.JobResultUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class LeaderChangeClusterComponentsTest {
    private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);
    private static final int SLOTS_PER_TM = 2;
    private static final int NUM_TMS = 2;
    public static final int PARALLELISM = 4;
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static TestingMiniCluster miniCluster;
    private static EmbeddedHaServicesWithLeadershipControl highAvailabilityServices;
    private JobGraph jobGraph;
    private JobID jobId;

    LeaderChangeClusterComponentsTest() {
    }

    @BeforeAll
    static void setupClass() throws Exception {
        highAvailabilityServices = new EmbeddedHaServicesWithLeadershipControl((Executor)EXECUTOR_RESOURCE.getExecutor());
        miniCluster = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build()).setHighAvailabilityServicesSupplier(() -> highAvailabilityServices).build();
        miniCluster.start();
    }

    @BeforeEach
    void setup() {
        this.jobGraph = this.createJobGraph(4);
        this.jobId = this.jobGraph.getJobID();
    }

    @AfterAll
    static void teardownClass() throws Exception {
        if (miniCluster != null) {
            miniCluster.close();
        }
    }

    @Test
    void testReelectionOfDispatcher() throws Exception {
        CompletableFuture submissionFuture = miniCluster.submitJob((ExecutionPlan)this.jobGraph);
        submissionFuture.get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(this.jobId);
        miniCluster.getJobStatus(this.jobId).get();
        highAvailabilityServices.revokeDispatcherLeadership().get();
        JobResult jobResult = (JobResult)jobResultFuture.get();
        Assertions.assertThat((Comparable)jobResult.getApplicationStatus()).isEqualTo((Object)ApplicationStatus.UNKNOWN);
        highAvailabilityServices.grantDispatcherLeadership();
        BlockingOperator.isBlocking = false;
        CompletableFuture submissionFuture2 = miniCluster.submitJob((ExecutionPlan)this.jobGraph);
        submissionFuture2.get();
        CompletableFuture jobResultFuture2 = miniCluster.requestJobResult(this.jobId);
        jobResult = (JobResult)jobResultFuture2.get();
        JobResultUtils.assertSuccess(jobResult);
    }

    @Test
    void testReelectionOfJobMaster() throws Exception {
        CompletableFuture submissionFuture = miniCluster.submitJob((ExecutionPlan)this.jobGraph);
        submissionFuture.get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(this.jobId);
        CommonTestUtils.waitUntilJobManagerIsInitialized((SupplierWithException<JobStatus, Exception>)((SupplierWithException)() -> (JobStatus)miniCluster.getJobStatus(this.jobId).get()));
        highAvailabilityServices.revokeJobMasterLeadership(this.jobId).get();
        JobResultUtils.assertIncomplete(jobResultFuture);
        BlockingOperator.isBlocking = false;
        highAvailabilityServices.grantJobMasterLeadership(this.jobId);
        JobResult jobResult = (JobResult)jobResultFuture.get();
        JobResultUtils.assertSuccess(jobResult);
    }

    @Test
    void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception {
        this.waitUntilTaskExecutorsHaveConnected(2);
        highAvailabilityServices.revokeResourceManagerLeadership().get();
        highAvailabilityServices.grantResourceManagerLeadership();
        Assertions.assertThat((Comparable)LeaderRetrievalUtils.retrieveLeaderInformation((LeaderRetrievalService)highAvailabilityServices.getResourceManagerLeaderRetriever(), (Duration)TESTING_TIMEOUT).getLeaderSessionID()).isNotNull();
        this.waitUntilTaskExecutorsHaveConnected(2);
    }

    private void waitUntilTaskExecutorsHaveConnected(int numTaskExecutors) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> ((ClusterOverview)miniCluster.requestClusterOverview().get()).getNumTaskManagersConnected() == numTaskExecutors), 10L);
    }

    private JobGraph createJobGraph(int parallelism) {
        BlockingOperator.isBlocking = true;
        JobVertex vertex = new JobVertex("blocking operator");
        vertex.setParallelism(parallelism);
        vertex.setInvokableClass(BlockingOperator.class);
        return JobGraphTestUtils.streamingJobGraph(vertex);
    }

    public static class BlockingOperator
    extends AbstractInvokable {
        static boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (isBlocking) {
                BlockingOperator blockingOperator = this;
                synchronized (blockingOperator) {
                    while (true) {
                        ((Object)((Object)this)).wait();
                    }
                }
            }
        }
    }
}

