/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDispatcherRunnerITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class);
    private static final Time TIMEOUT = Time.seconds((long)10L);
    @ClassRule
    public static TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();
    @ClassRule
    public static BlobServerResource blobServerResource = new BlobServerResource();
    private JobGraph jobGraph;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private PartialDispatcherServices partialDispatcherServices;
    private DefaultDispatcherRunnerFactory dispatcherRunnerFactory;

    @Before
    public void setup() {
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)SessionDispatcherFactory.INSTANCE);
        this.jobGraph = DefaultDispatcherRunnerITCase.createJobGraph();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.partialDispatcherServices = new PartialDispatcherServices(new Configuration(), (HighAvailabilityServices)new TestingHighAvailabilityServicesBuilder().build(), CompletableFuture::new, blobServerResource.getBlobServer(), (HeartbeatServices)new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, (ExecutionGraphInfoStore)new MemoryExecutionGraphInfoStore(), (FatalErrorHandler)this.fatalErrorHandler, (HistoryServerArchivist)VoidHistoryServerArchivist.INSTANCE, null, (Executor)ForkJoinPool.commonPool());
    }

    @After
    public void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    public void leaderChange_afterJobSubmission_recoversSubmittedJob() throws Exception {
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            UUID firstLeaderSessionId = UUID.randomUUID();
            DispatcherGateway firstDispatcherGateway = this.electLeaderAndRetrieveGateway(firstLeaderSessionId);
            firstDispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
            this.dispatcherLeaderElectionService.notLeader();
            UUID secondLeaderSessionId = UUID.randomUUID();
            DispatcherGateway secondDispatcherGateway = this.electLeaderAndRetrieveGateway(secondLeaderSessionId);
            Collection jobIds = (Collection)secondDispatcherGateway.listJobs(TIMEOUT).get();
            Assert.assertThat((Object)jobIds, (Matcher)Matchers.contains((Object[])new JobID[]{this.jobGraph.getJobID()}));
        }
    }

    private DispatcherGateway electLeaderAndRetrieveGateway(UUID firstLeaderSessionId) throws InterruptedException, ExecutionException {
        this.dispatcherLeaderElectionService.isLeader(firstLeaderSessionId);
        LeaderConnectionInfo leaderConnectionInfo = this.dispatcherLeaderElectionService.getConfirmationFuture().get();
        return rpcServiceResource.getTestingRpcService().connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid((UUID)leaderConnectionInfo.getLeaderSessionId()), DispatcherGateway.class).get();
    }

    @Test
    public void leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = new TestingJobManagerRunnerFactory(1);
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)new TestingDispatcherFactory(jobManagerRunnerFactory));
        this.jobGraphStore = new SingleJobJobGraphStore(this.jobGraph);
        try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner();){
            this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
            TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
            this.dispatcherLeaderElectionService.notLeader();
            LOG.info("Re-grant leadership first time.");
            this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
            Thread.sleep(1L);
            this.dispatcherLeaderElectionService.notLeader();
            LOG.info("Re-grant leadership second time.");
            UUID leaderSessionId = UUID.randomUUID();
            CompletableFuture<UUID> leaderFuture = this.dispatcherLeaderElectionService.isLeader(leaderSessionId);
            Assert.assertThat((Object)leaderFuture.isDone(), (Matcher)Matchers.is((Object)false));
            LOG.info("Complete the termination of the first job manager runner.");
            testingJobManagerRunner.completeTerminationFuture();
            Assert.assertThat((Object)leaderFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)leaderSessionId)));
            DispatcherGateway leaderGateway = rpcServiceResource.getTestingRpcService().connect(this.dispatcherLeaderElectionService.getAddress(), DispatcherId.fromUuid((UUID)leaderSessionId), DispatcherGateway.class).get();
            Assert.assertEquals((Object)this.jobGraph.getJobID(), (Object)Iterables.getOnlyElement((Iterable)((Iterable)leaderGateway.listJobs(TIMEOUT).get())));
        }
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.singleNoOpJobGraph();
    }

    private DispatcherRunner createDispatcherRunner() throws Exception {
        return this.dispatcherRunnerFactory.createDispatcherRunner((LeaderElectionService)this.dispatcherLeaderElectionService, (FatalErrorHandler)this.fatalErrorHandler, () -> this.jobGraphStore, (Executor)TestingUtils.defaultExecutor(), (RpcService)rpcServiceResource.getTestingRpcService(), this.partialDispatcherServices);
    }

    private static class TestingDispatcherFactory
    implements DispatcherFactory {
        private final JobManagerRunnerFactory jobManagerRunnerFactory;

        private TestingDispatcherFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
        }

        public Dispatcher createDispatcher(RpcService rpcService, DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception {
            return new StandaloneDispatcher(rpcService, fencingToken, recoveredJobs, dispatcherBootstrapFactory, DispatcherServices.from((PartialDispatcherServicesWithJobGraphStore)partialDispatcherServicesWithJobGraphStore, (JobManagerRunnerFactory)this.jobManagerRunnerFactory));
        }
    }
}

