/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherLeaderProcess;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherLeaderProcessFactory;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultDispatcherRunnerTest
extends TestLogger {
    private TestingLeaderElection leaderElection;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingDispatcherLeaderProcessFactory testingDispatcherLeaderProcessFactory;

    @Before
    public void setup() {
        this.leaderElection = new TestingLeaderElection();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.defaultValue();
    }

    @After
    public void teardown() throws Exception {
        this.leaderElection.close();
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
            this.testingFatalErrorHandler = null;
        }
    }

    @Test
    public void testLeaderElectionLifecycle() throws Exception {
        Assert.assertTrue((boolean)this.leaderElection.isStopped());
        try (DispatcherRunner unusedDisptacherRunner = this.createDispatcherRunner();){
            Assert.assertFalse((boolean)this.leaderElection.isStopped());
        }
        Assert.assertTrue((boolean)this.leaderElection.isStopped());
    }

    @Test
    public void closeAsync_doesNotCompleteUncompletedShutDownFuture() throws Exception {
        DispatcherRunner dispatcherRunner = this.createDispatcherRunner();
        CompletableFuture terminationFuture = dispatcherRunner.closeAsync();
        terminationFuture.get();
        CompletableFuture shutDownFuture = dispatcherRunner.getShutDownFuture();
        MatcherAssert.assertThat((Object)shutDownFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void getShutDownFuture_whileRunning_forwardsDispatcherLeaderProcessShutDownRequest() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<ApplicationStatus>();
        TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId).setShutDownFuture(shutDownFuture).build();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(leaderSessionId);
            CompletableFuture dispatcherShutDownFuture = dispatcherRunner.getShutDownFuture();
            Assert.assertFalse((boolean)dispatcherShutDownFuture.isDone());
            ApplicationStatus finalApplicationStatus = ApplicationStatus.UNKNOWN;
            shutDownFuture.complete(finalApplicationStatus);
            MatcherAssert.assertThat((Object)((ApplicationStatus)dispatcherShutDownFuture.get()), (Matcher)CoreMatchers.is((Object)finalApplicationStatus));
        }
    }

    @Test
    public void getShutDownFuture_afterClose_ignoresDispatcherLeaderProcessShutDownRequest() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<ApplicationStatus>();
        TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId).setShutDownFuture(shutDownFuture).build();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(leaderSessionId);
            CompletableFuture dispatcherShutDownFuture = dispatcherRunner.getShutDownFuture();
            Assert.assertFalse((boolean)dispatcherShutDownFuture.isDone());
            dispatcherRunner.closeAsync();
            ApplicationStatus finalApplicationStatus = ApplicationStatus.UNKNOWN;
            shutDownFuture.complete(finalApplicationStatus);
            try {
                dispatcherShutDownFuture.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"The dispatcher runner should no longer react to the dispatcher leader process's shut down request if it has been terminated.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
    }

    @Test
    public void getShutDownFuture_newLeader_ignoresOldDispatcherLeaderProcessShutDownRequest() throws Exception {
        UUID firstLeaderSessionId = UUID.randomUUID();
        UUID secondLeaderSessionId = UUID.randomUUID();
        CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<ApplicationStatus>();
        TestingDispatcherLeaderProcess firstTestingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(firstLeaderSessionId).setShutDownFuture(shutDownFuture).build();
        TestingDispatcherLeaderProcess secondTestingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(secondLeaderSessionId).build();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(firstTestingDispatcherLeaderProcess, secondTestingDispatcherLeaderProcess);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(firstLeaderSessionId);
            CompletableFuture dispatcherShutDownFuture = dispatcherRunner.getShutDownFuture();
            Assert.assertFalse((boolean)dispatcherShutDownFuture.isDone());
            this.leaderElection.isLeader(secondLeaderSessionId);
            ApplicationStatus finalApplicationStatus = ApplicationStatus.UNKNOWN;
            shutDownFuture.complete(finalApplicationStatus);
            Assert.assertFalse((boolean)dispatcherShutDownFuture.isDone());
        }
    }

    @Test
    public void revokeLeadership_withExistingLeader_stopsLeaderProcess() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        OneShotLatch startLatch = new OneShotLatch();
        OneShotLatch stopLatch = new OneShotLatch();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(TestingDispatcherLeaderProcess.newBuilder(leaderSessionId).setStartConsumer(ignored -> startLatch.trigger()).setCloseAsyncSupplier(() -> {
            stopLatch.trigger();
            return FutureUtils.completedVoidFuture();
        }).build());
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(leaderSessionId);
            startLatch.await();
            this.leaderElection.notLeader();
            stopLatch.await();
        }
    }

    @Test
    public void grantLeadership_withExistingLeader_waitsForTerminationOfFirstLeader() throws Exception {
        UUID firstLeaderSessionId = UUID.randomUUID();
        UUID secondLeaderSessionId = UUID.randomUUID();
        StartStopDispatcherLeaderProcess firstTestingDispatcherLeaderProcess = StartStopDispatcherLeaderProcess.create(firstLeaderSessionId);
        StartStopDispatcherLeaderProcess secondTestingDispatcherLeaderProcess = StartStopDispatcherLeaderProcess.create(secondLeaderSessionId);
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(firstTestingDispatcherLeaderProcess.asTestingDispatcherLeaderProcess(), secondTestingDispatcherLeaderProcess.asTestingDispatcherLeaderProcess());
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(firstLeaderSessionId);
            MatcherAssert.assertThat((Object)firstTestingDispatcherLeaderProcess.isStarted(), (Matcher)CoreMatchers.is((Object)true));
            this.leaderElection.isLeader(secondLeaderSessionId);
            MatcherAssert.assertThat((Object)secondTestingDispatcherLeaderProcess.isStarted(), (Matcher)CoreMatchers.is((Object)false));
            firstTestingDispatcherLeaderProcess.terminateProcess();
            MatcherAssert.assertThat((Object)secondTestingDispatcherLeaderProcess.isStarted(), (Matcher)CoreMatchers.is((Object)true));
            secondTestingDispatcherLeaderProcess.terminateProcess();
        }
    }

    @Test
    public void grantLeadership_validLeader_confirmsLeaderSession() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            LeaderInformation leaderInformation = this.leaderElection.isLeader(leaderSessionId).join();
            MatcherAssert.assertThat((Object)leaderInformation.getLeaderSessionID(), (Matcher)CoreMatchers.is((Object)leaderSessionId));
        }
    }

    @Test
    public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<String> contenderConfirmationFuture = new CompletableFuture<String>();
        TestingDispatcherLeaderProcess testingDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId).setConfirmLeaderSessionFuture(contenderConfirmationFuture).build();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(testingDispatcherLeaderProcess);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.leaderElection.isLeader(leaderSessionId);
            this.leaderElection.notLeader();
            contenderConfirmationFuture.complete("leader address");
            MatcherAssert.assertThat((Object)this.leaderElection.hasLeadershipAsync(leaderSessionId).get(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void grantLeadership_multipleLeaderChanges_lastDispatcherLeaderProcessWaitsForOthersToTerminateBeforeItStarts() throws Exception {
        UUID firstLeaderSession = UUID.randomUUID();
        UUID secondLeaderSession = UUID.randomUUID();
        UUID thirdLeaderSession = UUID.randomUUID();
        CompletableFuture<Object> firstDispatcherLeaderProcessTerminationFuture = new CompletableFuture<Object>();
        TestingDispatcherLeaderProcess firstDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(firstLeaderSession).setCloseAsyncSupplier(() -> firstDispatcherLeaderProcessTerminationFuture).build();
        CompletableFuture<Object> secondDispatcherLeaderProcessTerminationFuture = new CompletableFuture<Object>();
        TestingDispatcherLeaderProcess secondDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(secondLeaderSession).setCloseAsyncSupplier(() -> secondDispatcherLeaderProcessTerminationFuture).build();
        CompletableFuture thirdDispatcherLeaderProcessHasBeenStartedFuture = new CompletableFuture();
        TestingDispatcherLeaderProcess thirdDispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(thirdLeaderSession).setStartConsumer(thirdDispatcherLeaderProcessHasBeenStartedFuture::complete).build();
        this.testingDispatcherLeaderProcessFactory = TestingDispatcherLeaderProcessFactory.from(firstDispatcherLeaderProcess, secondDispatcherLeaderProcess, thirdDispatcherLeaderProcess);
        DispatcherRunner dispatcherRunner = this.createDispatcherRunner();
        try {
            this.leaderElection.isLeader(firstLeaderSession);
            this.leaderElection.isLeader(secondLeaderSession);
            this.leaderElection.isLeader(thirdLeaderSession);
            firstDispatcherLeaderProcessTerminationFuture.complete(null);
            MatcherAssert.assertThat((Object)thirdDispatcherLeaderProcessHasBeenStartedFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
            secondDispatcherLeaderProcessTerminationFuture.complete(null);
            MatcherAssert.assertThat((Object)thirdDispatcherLeaderProcessHasBeenStartedFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
        }
        finally {
            firstDispatcherLeaderProcessTerminationFuture.complete(null);
            secondDispatcherLeaderProcessTerminationFuture.complete(null);
            dispatcherRunner.close();
        }
    }

    private DispatcherRunner createDispatcherRunner() throws Exception {
        return DefaultDispatcherRunner.create((LeaderElection)this.leaderElection, (FatalErrorHandler)this.testingFatalErrorHandler, (DispatcherLeaderProcessFactory)this.testingDispatcherLeaderProcessFactory);
    }

    private static final class StartStopDispatcherLeaderProcess {
        private final TestingDispatcherLeaderProcess testingDispatcherLeaderProcess;
        private final CompletableFuture<Void> startFuture;
        private final CompletableFuture<Void> terminationFuture;

        private StartStopDispatcherLeaderProcess(TestingDispatcherLeaderProcess testingDispatcherLeaderProcess, CompletableFuture<Void> startFuture, CompletableFuture<Void> terminationFuture) {
            this.testingDispatcherLeaderProcess = testingDispatcherLeaderProcess;
            this.startFuture = startFuture;
            this.terminationFuture = terminationFuture;
        }

        private TestingDispatcherLeaderProcess asTestingDispatcherLeaderProcess() {
            return this.testingDispatcherLeaderProcess;
        }

        private boolean isStarted() {
            return this.startFuture.isDone();
        }

        private void terminateProcess() {
            this.terminationFuture.complete(null);
        }

        private static StartStopDispatcherLeaderProcess create(UUID leaderSessionId) {
            CompletableFuture<Void> processStartFuture = new CompletableFuture<Void>();
            CompletableFuture<Void> processTerminationFuture = new CompletableFuture<Void>();
            TestingDispatcherLeaderProcess dispatcherLeaderProcess = TestingDispatcherLeaderProcess.newBuilder(leaderSessionId).setStartConsumer(processStartFuture::complete).setCloseAsyncSupplier(() -> processTerminationFuture).build();
            return new StartStopDispatcherLeaderProcess(dispatcherLeaderProcess, processStartFuture, processTerminationFuture);
        }
    }
}

