/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={TestLoggerExtension.class})
public class JobDispatcherITCase {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    private Supplier<DispatcherResourceManagerComponentFactory> createJobModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration) {
        return () -> {
            try {
                return new DefaultDispatcherResourceManagerComponentFactory((DispatcherRunnerFactory)new DefaultDispatcherRunnerFactory((DispatcherLeaderProcessFactoryFactory)JobDispatcherLeaderProcessFactoryFactory.create((JobGraphRetriever)FileJobGraphRetriever.createFrom((Configuration)configuration, null))), (ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance(), (RestEndpointFactory)JobRestEndpointFactory.INSTANCE);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Test
    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path tmpPath) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, (Object)HighAvailabilityMode.ZOOKEEPER.name());
        TestingMiniClusterConfiguration clusterConfiguration = TestingMiniClusterConfiguration.newBuilder().setConfiguration(configuration).build();
        EmbeddedHaServicesWithLeadershipControl haServices = new EmbeddedHaServicesWithLeadershipControl((Executor)EXECUTOR_RESOURCE.getExecutor());
        Configuration newConfiguration = new Configuration((Configuration)clusterConfiguration.getConfiguration());
        long checkpointInterval = 100L;
        JobID jobID = this.generateAndPersistJobGraph(newConfiguration, 100L, tmpPath);
        TestingMiniCluster.Builder clusterBuilder = TestingMiniCluster.newBuilder(clusterConfiguration).setHighAvailabilityServicesSupplier(() -> haServices).setDispatcherResourceManagerComponentFactorySupplier(this.createJobModeDispatcherResourceManagerComponentFactorySupplier(newConfiguration));
        AtLeastOneCheckpointInvokable.reset();
        try (TestingMiniCluster cluster = clusterBuilder.build();){
            cluster.start();
            AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
            CompletableFuture firstJobResult = cluster.requestJobResult(jobID);
            haServices.revokeDispatcherLeadership();
            Assertions.assertEquals((Object)ApplicationStatus.UNKNOWN, (Object)((JobResult)firstJobResult.get()).getApplicationStatus());
            haServices.grantDispatcherLeadership();
            JobDispatcherITCase.awaitJobStatus(cluster, jobID, JobStatus.RUNNING);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> ((ArchivedExecutionGraph)cluster.getArchivedExecutionGraph(jobID).get()).getCheckpointStatsSnapshot().getLatestRestoredCheckpoint() != null));
        }
    }

    private JobID generateAndPersistJobGraph(Configuration configuration, long checkpointInterval, Path tmpPath) throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
        jobVertex.setParallelism(1);
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(checkpointInterval).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(checkpointingSettings).build();
        Path jobGraphPath = tmpPath.resolve((String)FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.defaultValue());
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(jobGraphPath, StandardOpenOption.CREATE));){
            objectOutputStream.writeObject(jobGraph);
        }
        configuration.setString(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
        return jobGraph.getJobID();
    }

    private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            try {
                return cluster.getJobStatus(jobId).get() == status;
            }
            catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }));
    }

    public static class AtLeastOneCheckpointInvokable
    extends AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable {
        private static volatile CountDownLatch atLeastOneCheckpointCompleted;

        private static void reset() {
            atLeastOneCheckpointCompleted = new CountDownLatch(1);
        }

        public AtLeastOneCheckpointInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            atLeastOneCheckpointCompleted.countDown();
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {
            return CompletableFuture.completedFuture(null);
        }
    }
}

