/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ZooKeeperDefaultDispatcherRunnerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class);
    private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10L);
    @RegisterExtension
    public static AllCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = new AllCallbackWrapper((CustomExtension)new ZooKeeperExtension());
    @TempDir
    public static Path temporaryFolder;
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION;
    @RegisterExtension
    public static AllCallbackWrapper<TestingRpcServiceExtension> testingRpcServiceExtensionWrapper;
    private BlobServer blobServer;
    private TestingFatalErrorHandler fatalErrorHandler;
    private File clusterHaStorageDir;
    private Configuration configuration;

    ZooKeeperDefaultDispatcherRunnerTest() {
    }

    @BeforeEach
    void setup() throws IOException {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.configuration = new Configuration();
        this.configuration.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        this.configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)((ZooKeeperExtension)zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        this.configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)TempDirUtils.newFolder((Path)temporaryFolder).getAbsolutePath());
        this.clusterHaStorageDir = new File(HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath((Configuration)this.configuration).toString());
        this.blobServer = new BlobServer(this.configuration, TempDirUtils.newFolder((Path)temporaryFolder), (BlobStore)BlobUtils.createBlobStoreFromConfig((Configuration)this.configuration));
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    void testResourceCleanupUnderLeadershipChange() throws Exception {
        TestingRpcService rpcService = ((TestingRpcServiceExtension)testingRpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService();
        TestingLeaderElection dispatcherLeaderElection = new TestingLeaderElection();
        try (final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)this.fatalErrorHandler);
             TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder().setDispatcherLeaderElection(dispatcherLeaderElection).setJobMasterLeaderRetrieverFunction(jobId -> ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)curatorFrameworkWrapper.asCuratorFramework())).build();){
            PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(this.configuration, (HighAvailabilityServices)highAvailabilityServices, CompletableFuture::new, this.blobServer, (HeartbeatServices)new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, (ExecutionGraphInfoStore)new MemoryExecutionGraphInfoStore(), (FatalErrorHandler)this.fatalErrorHandler, (HistoryServerArchivist)VoidHistoryServerArchivist.INSTANCE, null, (Executor)ForkJoinPool.commonPool(), new DispatcherOperationCaches(), Collections.emptySet());
            DefaultDispatcherRunnerFactory defaultDispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)SessionDispatcherFactory.INSTANCE);
            try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner(rpcService, dispatcherLeaderElection, new JobPersistenceComponentFactory(){

                public ExecutionPlanStore createExecutionPlanStore() {
                    return ZooKeeperDefaultDispatcherRunnerTest.this.createZooKeeperExecutionPlanStore(curatorFrameworkWrapper.asCuratorFramework());
                }

                public JobResultStore createJobResultStore() {
                    return new EmbeddedJobResultStore();
                }
            }, partialDispatcherServices, (DispatcherRunnerFactory)defaultDispatcherRunnerFactory);){
                DispatcherGateway dispatcherGateway = this.grantLeadership(dispatcherLeaderElection);
                JobGraph jobGraph = this.createJobGraphWithBlobs();
                LOG.info("Initial job submission {}.", (Object)jobGraph.getJobID());
                dispatcherGateway.submitJob((ExecutionPlan)jobGraph, TESTING_TIMEOUT).get();
                dispatcherLeaderElection.notLeader();
                LOG.info("Re-grant leadership first time.");
                dispatcherGateway = this.grantLeadership(dispatcherLeaderElection);
                LOG.info("Cancel recovered job {}.", (Object)jobGraph.getJobID());
                CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(jobGraph.getJobID(), TESTING_TIMEOUT);
                dispatcherGateway.cancelJob(jobGraph.getJobID(), TESTING_TIMEOUT).get();
                JobResult jobResult = (JobResult)jobResultFuture.get();
                Assertions.assertThat((Comparable)jobResult.getApplicationStatus()).isEqualTo((Object)ApplicationStatus.CANCELED);
                dispatcherLeaderElection.notLeader();
                ExecutionPlanStore submittedExecutionPlanStore = this.createZooKeeperExecutionPlanStore(curatorFrameworkWrapper.asCuratorFramework());
                CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> submittedExecutionPlanStore.getJobIds().isEmpty()), 20L);
            }
        }
        Assertions.assertThat((File)this.clusterHaStorageDir).isEmptyDirectory();
    }

    private DispatcherRunner createDispatcherRunner(TestingRpcService rpcService, LeaderElection dispatcherLeaderElection, JobPersistenceComponentFactory jobPersistenceComponentFactory, PartialDispatcherServices partialDispatcherServices, DispatcherRunnerFactory dispatcherRunnerFactory) throws Exception {
        return dispatcherRunnerFactory.createDispatcherRunner(dispatcherLeaderElection, (FatalErrorHandler)this.fatalErrorHandler, jobPersistenceComponentFactory, (Executor)EXECUTOR_EXTENSION.getExecutor(), (RpcService)rpcService, partialDispatcherServices);
    }

    private ExecutionPlanStore createZooKeeperExecutionPlanStore(CuratorFramework client) {
        try {
            return ZooKeeperUtils.createExecutionPlans((CuratorFramework)client, (Configuration)this.configuration);
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
            return null;
        }
    }

    private DispatcherGateway grantLeadership(TestingLeaderElection dispatcherLeaderElection) throws InterruptedException, ExecutionException {
        return (DispatcherGateway)((CompletableFuture)dispatcherLeaderElection.isLeader(UUID.randomUUID()).thenCompose(leaderInformation -> ((TestingRpcServiceExtension)testingRpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService().connect(leaderInformation.getLeaderAddress(), DispatcherId.fromUuid((UUID)leaderInformation.getLeaderSessionID()), DispatcherGateway.class))).get();
    }

    private JobGraph createJobGraphWithBlobs() throws IOException {
        JobVertex vertex = new JobVertex("test vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
        PermanentBlobKey permanentBlobKey = this.blobServer.putPermanent(jobGraph.getJobID(), new byte[256]);
        jobGraph.addUserJarBlobKey(permanentBlobKey);
        return jobGraph;
    }

    static {
        EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
        testingRpcServiceExtensionWrapper = new AllCallbackWrapper((CustomExtension)new TestingRpcServiceExtension());
    }
}

