/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class MiniDispatcherTest
extends TestLogger {
    private static final Time timeout = Time.seconds((long)10L);
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static JobGraph jobGraph;
    private static ExecutionGraphInfo executionGraphInfo;
    private static TestingRpcService rpcService;
    private static Configuration configuration;
    private static BlobServer blobServer;
    private final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
    private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
    private final ExecutionGraphInfoStore executionGraphInfoStore = new MemoryExecutionGraphInfoStore();
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory;
    private TestingCleanupRunnerFactory testingCleanupRunnerFactory;
    private CompletableFuture<Void> localCleanupResultFuture;
    private CompletableFuture<Void> globalCleanupResultFuture;

    @BeforeClass
    public static void setupClass() throws IOException {
        jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
        rpcService = new TestingRpcService();
        configuration = new Configuration();
        blobServer = new BlobServer(configuration, temporaryFolder.newFolder(), (BlobStore)new VoidBlobStore());
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServicesBuilder().build();
        this.testingJobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.testingCleanupRunnerFactory = new TestingCleanupRunnerFactory();
        this.localCleanupResultFuture = FutureUtils.completedVoidFuture();
        this.globalCleanupResultFuture = FutureUtils.completedVoidFuture();
    }

    @AfterClass
    public static void teardownClass() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        if (blobServer != null) {
            blobServer.close();
        }
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    public void testSingleJobRecovery() throws Exception {
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingJobManagerRunner = this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
            Assert.assertThat((Object)testingJobManagerRunner.getJobID(), (Matcher)Matchers.is((Object)jobGraph.getJobID()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDirtyJobResultCleanup() throws Exception {
        JobID jobId = new JobID();
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED, null, TestingJobResultStore.createSuccessfulJobResult(jobId));
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingCleanupRunner = this.testingCleanupRunnerFactory.takeCreatedJobManagerRunner();
            Assert.assertThat((Object)testingCleanupRunner.getJobID(), (Matcher)Matchers.is((Object)jobId));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    @Test
    public void testTerminationAfterJobCompletion() throws Exception {
        this.globalCleanupResultFuture = new CompletableFuture();
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingJobManagerRunner = this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> !this.highAvailabilityServices.getJobResultStore().getDirtyResults().isEmpty()));
            Assert.assertFalse((String)"The shutdownFuture should not be completed before the cleanup is triggered.", (boolean)miniDispatcher.getShutDownFuture().isDone());
            this.globalCleanupResultFuture.complete(null);
            miniDispatcher.getShutDownFuture().get();
            this.globalCleanupResultFuture.complete(null);
        }
        catch (Throwable throwable) {
            this.globalCleanupResultFuture.complete(null);
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    @Test
    public void testNotTerminationWithoutGloballyTerminalState() throws Exception {
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingJobManagerRunner = this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
            testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.SUSPENDED).build()));
            testingJobManagerRunner.getTerminationFuture().get();
            Assertions.assertThat((CompletableFuture)miniDispatcher.getShutDownFuture()).isNotDone();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobResultRetrieval() throws Exception {
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingJobManagerRunner = this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
            testingJobManagerRunner.completeResultFuture(executionGraphInfo);
            Assert.assertFalse((boolean)miniDispatcher.getTerminationFuture().isDone());
            DispatcherGateway dispatcherGateway = (DispatcherGateway)miniDispatcher.getSelfGateway(DispatcherGateway.class);
            CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(jobGraph.getJobID(), timeout);
            JobResult jobResult = (JobResult)jobResultFuture.get();
            Assert.assertThat((Object)jobResult.getJobId(), (Matcher)Matchers.is((Object)jobGraph.getJobID()));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShutdownIfJobCancelledInNormalMode() throws Exception {
        MiniDispatcher miniDispatcher = this.createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
        miniDispatcher.start();
        try {
            TestingJobManagerRunner testingJobManagerRunner = this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
            Assert.assertFalse((boolean)miniDispatcher.getTerminationFuture().isDone());
            DispatcherGateway dispatcherGateway = (DispatcherGateway)miniDispatcher.getSelfGateway(DispatcherGateway.class);
            dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds((long)10L));
            testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.CANCELED).build()));
            ApplicationStatus applicationStatus = (ApplicationStatus)miniDispatcher.getShutDownFuture().get();
            Assert.assertThat((Object)applicationStatus, (Matcher)Matchers.is((Object)ApplicationStatus.CANCELED));
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{miniDispatcher});
    }

    private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode executionMode) throws Exception {
        return this.createMiniDispatcher(executionMode, jobGraph, null);
    }

    private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode executionMode, @Nullable JobGraph recoveredJobGraph, @Nullable JobResult recoveredDirtyJob) throws Exception {
        DefaultJobManagerRunnerRegistry jobManagerRunnerRegistry = new DefaultJobManagerRunnerRegistry(2);
        return new MiniDispatcher((RpcService)rpcService, DispatcherId.generate(), new DispatcherServices(configuration, (HighAvailabilityServices)this.highAvailabilityServices, () -> CompletableFuture.completedFuture(this.resourceManagerGateway), blobServer, this.heartbeatServices, this.executionGraphInfoStore, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler(), (HistoryServerArchivist)VoidHistoryServerArchivist.INSTANCE, null, new DispatcherOperationCaches(), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), (JobGraphWriter)this.highAvailabilityServices.getJobGraphStore(), this.highAvailabilityServices.getJobResultStore(), (JobManagerRunnerFactory)this.testingJobManagerRunnerFactory, (CleanupRunnerFactory)this.testingCleanupRunnerFactory, (Executor)ForkJoinPool.commonPool()), recoveredJobGraph, recoveredDirtyJob, (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), (JobManagerRunnerRegistry)jobManagerRunnerRegistry, (ResourceCleanerFactory)TestingResourceCleanerFactory.builder().withLocallyCleanableResource((LocallyCleanableResource)jobManagerRunnerRegistry).withGloballyCleanableResource((jobId, ignoredExecutor) -> this.globalCleanupResultFuture).withLocallyCleanableResource((jobId, ignoredExecutor) -> this.localCleanupResultFuture).build(), executionMode);
    }
}

