/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.entrypoint;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider;
import org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

public class ClusterEntrypointTest
extends TestLogger {
    private static final long TIMEOUT_MS = 10000L;
    private Configuration flinkConfig;
    @ClassRule
    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE = new TestExecutorResource(Executors::newSingleThreadExecutor);
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Before
    public void before() {
        this.flinkConfig = new Configuration();
        ExceptionThrowingDelegationTokenProvider.reset();
        ExceptionThrowingDelegationTokenReceiver.reset();
    }

    @After
    public void after() {
        ExceptionThrowingDelegationTokenProvider.reset();
        ExceptionThrowingDelegationTokenReceiver.reset();
    }

    @Test(expected=IllegalConfigurationException.class)
    public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
        this.flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).build();
        Assert.fail((String)"Entrypoint initialization is supposed to fail");
    }

    @Test
    public void testClusterStartShouldObtainTokens() throws Exception {
        ExceptionThrowingDelegationTokenProvider.addToken.set(true);
        TestingHighAvailabilityServices testingHaService = new TestingHighAvailabilityServicesBuilder().build();
        TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setHighAvailabilityServices(testingHaService).build();
        CompletableFuture<ApplicationStatus> appStatusFuture = this.startClusterEntrypoint(testingEntryPoint);
        testingEntryPoint.closeAsync();
        MatcherAssert.assertThat((Object)appStatusFuture.get(10000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.UNKNOWN));
        MatcherAssert.assertThat((Object)ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), (Matcher)CoreMatchers.is((Object)1));
    }

    @Test
    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<Void>();
        TestingHighAvailabilityServices testingHaService = new TestingHighAvailabilityServicesBuilder().setCloseFuture(closeFuture).setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture).build();
        TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setHighAvailabilityServices(testingHaService).build();
        CompletableFuture<ApplicationStatus> appStatusFuture = this.startClusterEntrypoint(testingEntryPoint);
        testingEntryPoint.closeAsync();
        MatcherAssert.assertThat((Object)appStatusFuture.get(10000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.UNKNOWN));
        MatcherAssert.assertThat((Object)closeFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)closeAndCleanupAllDataFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
        CompletableFuture deregisterFuture = new CompletableFuture();
        TestingResourceManagerFactory testingResourceManagerFactory = new TestingResourceManagerFactory.Builder().setInternalDeregisterApplicationConsumer((TriConsumer<UUID, ApplicationStatus, String>)((TriConsumer)(ignored1, ignored2, ignore3) -> deregisterFuture.complete(null))).build();
        TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setResourceManagerFactory(testingResourceManagerFactory).build();
        CompletableFuture<ApplicationStatus> appStatusFuture = this.startClusterEntrypoint(testingEntryPoint);
        testingEntryPoint.closeAsync();
        MatcherAssert.assertThat((Object)appStatusFuture.get(10000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.UNKNOWN));
        MatcherAssert.assertThat((Object)deregisterFuture.isDone(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws Exception {
        CompletableFuture deregisterFuture = new CompletableFuture();
        CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<Void>();
        CompletableFuture<ApplicationStatus> dispatcherShutDownFuture = new CompletableFuture<ApplicationStatus>();
        TestingHighAvailabilityServices testingHaService = new TestingHighAvailabilityServicesBuilder().setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture).build();
        TestingResourceManagerFactory testingResourceManagerFactory = new TestingResourceManagerFactory.Builder().setInternalDeregisterApplicationConsumer((TriConsumer<UUID, ApplicationStatus, String>)((TriConsumer)(ignored1, ignored2, ignore3) -> deregisterFuture.complete(null))).setInitializeConsumer(ignore -> dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED)).build();
        TestingDispatcherRunnerFactory testingDispatcherRunnerFactory = new TestingDispatcherRunnerFactory.Builder().setShutDownFuture(dispatcherShutDownFuture).build();
        TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setResourceManagerFactory(testingResourceManagerFactory).setDispatcherRunnerFactory(testingDispatcherRunnerFactory).setHighAvailabilityServices(testingHaService).build();
        CompletableFuture<ApplicationStatus> appStatusFuture = this.startClusterEntrypoint(testingEntryPoint);
        MatcherAssert.assertThat((Object)appStatusFuture.get(10000L, TimeUnit.MILLISECONDS), (Matcher)CoreMatchers.is((Object)ApplicationStatus.SUCCEEDED));
        MatcherAssert.assertThat((Object)deregisterFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)closeAndCleanupAllDataFuture.isDone(), (Matcher)CoreMatchers.is((Object)true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws Exception {
        Assume.assumeTrue((OperatingSystem.isLinux() || OperatingSystem.isMac() ? 1 : 0) != 0);
        File markerFile = new File(TEMPORARY_FOLDER.getRoot(), UUID.randomUUID() + ".marker");
        TestingClusterEntrypointProcess clusterEntrypointProcess = new TestingClusterEntrypointProcess(markerFile);
        clusterEntrypointProcess.startProcess();
        boolean success = false;
        try {
            long pid = clusterEntrypointProcess.getProcessId();
            Assert.assertTrue((String)"Cannot determine process ID", (pid != -1L ? 1 : 0) != 0);
            TestJvmProcess.waitForMarkerFile(markerFile, 30000L);
            TestJvmProcess.killProcessWithSigTerm(pid);
            boolean exited = clusterEntrypointProcess.waitFor(10000L, TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((String)String.format("Process %s does not exit within %s ms", pid, 10000L), (Object)exited, (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((String)"markerFile should be deleted in closeAsync shutdownHook", (Object)markerFile.exists(), (Matcher)CoreMatchers.is((Object)false));
            success = true;
        }
        finally {
            if (!success) {
                clusterEntrypointProcess.printProcessLog();
            }
            clusterEntrypointProcess.destroy();
        }
    }

    @Test
    public void testWorkingDirectoryIsSetupWhenStartingTheClusterEntrypoint() throws Exception {
        File workingDirBase = TEMPORARY_FOLDER.newFolder();
        ResourceID resourceId = new ResourceID("foobar");
        ClusterEntrypointTest.configureWorkingDirectory(this.flinkConfig, workingDirBase, resourceId);
        File workingDir = ClusterEntrypointUtils.generateJobManagerWorkingDirectoryFile((Configuration)this.flinkConfig, (ResourceID)resourceId);
        try (TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).build();){
            testingEntryPoint.startCluster();
            Assert.assertTrue((boolean)workingDir.exists());
        }
    }

    private static void configureWorkingDirectory(Configuration configuration, File workingDirBase, ResourceID resourceId) {
        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, (Object)workingDirBase.getAbsolutePath());
        configuration.set(JobManagerOptions.JOB_MANAGER_RESOURCE_ID, (Object)resourceId.toString());
    }

    @Test
    public void testWorkingDirectoryIsNotDeletedWhenStoppingClusterEntrypoint() throws Exception {
        File workingDirBase = TEMPORARY_FOLDER.newFolder();
        ResourceID resourceId = new ResourceID("foobar");
        ClusterEntrypointTest.configureWorkingDirectory(this.flinkConfig, workingDirBase, resourceId);
        File workingDir = ClusterEntrypointUtils.generateJobManagerWorkingDirectoryFile((Configuration)this.flinkConfig, (ResourceID)resourceId);
        try (TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).build();){
            testingEntryPoint.startCluster();
        }
        Assert.assertTrue((String)"The working directory has been deleted when the cluster entrypoint shut down. This should not happen.", (boolean)workingDir.exists());
    }

    @Test
    public void testWorkingDirectoryIsDeletedIfApplicationCompletes() throws Exception {
        File workingDirBase = TEMPORARY_FOLDER.newFolder();
        ResourceID resourceId = new ResourceID("foobar");
        ClusterEntrypointTest.configureWorkingDirectory(this.flinkConfig, workingDirBase, resourceId);
        File workingDir = ClusterEntrypointUtils.generateJobManagerWorkingDirectoryFile((Configuration)this.flinkConfig, (ResourceID)resourceId);
        CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<ApplicationStatus>();
        TestingEntryPoint testingEntryPoint = new TestingEntryPoint.Builder().setConfiguration(this.flinkConfig).setDispatcherRunnerFactory(new TestingDispatcherRunnerFactory.Builder().setShutDownFuture(shutDownFuture).build()).build();
        testingEntryPoint.startCluster();
        shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
        testingEntryPoint.getTerminationFuture().join();
        Assert.assertFalse((String)"The working directory has not been deleted when the application completed successfully.", (boolean)workingDir.exists());
    }

    private CompletableFuture<ApplicationStatus> startClusterEntrypoint(TestingEntryPoint testingEntryPoint) throws Exception {
        testingEntryPoint.startCluster();
        return FutureUtils.supplyAsync(() -> (ApplicationStatus)testingEntryPoint.getTerminationFuture().get(), (Executor)TEST_EXECUTOR_RESOURCE.getExecutor());
    }

    private static class TestingDispatcherRunnerFactory
    implements DispatcherRunnerFactory {
        private final CompletableFuture<ApplicationStatus> shutDownFuture;

        private TestingDispatcherRunnerFactory(CompletableFuture<ApplicationStatus> shutDownFuture) {
            this.shutDownFuture = shutDownFuture;
        }

        public DispatcherRunner createDispatcherRunner(LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception {
            return TestingDispatcherRunner.newBuilder().setShutDownFuture(this.shutDownFuture).build();
        }

        public static final class Builder {
            private CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture();

            public Builder setShutDownFuture(CompletableFuture<ApplicationStatus> shutDownFuture) {
                this.shutDownFuture = shutDownFuture;
                return this;
            }

            public TestingDispatcherRunnerFactory build() {
                return new TestingDispatcherRunnerFactory(this.shutDownFuture);
            }
        }
    }

    private static class TestingEntryPoint
    extends ClusterEntrypoint {
        private final HighAvailabilityServices haService;
        private final ResourceManagerFactory<ResourceID> resourceManagerFactory;
        private final DispatcherRunnerFactory dispatcherRunnerFactory;

        private TestingEntryPoint(Configuration configuration, HighAvailabilityServices haService, ResourceManagerFactory<ResourceID> resourceManagerFactory, DispatcherRunnerFactory dispatcherRunnerFactory) {
            super(configuration);
            SignalHandler.register((Logger)LOG);
            this.haService = haService;
            this.resourceManagerFactory = resourceManagerFactory;
            this.dispatcherRunnerFactory = dispatcherRunnerFactory;
        }

        protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
            return new DefaultDispatcherResourceManagerComponentFactory(this.dispatcherRunnerFactory, this.resourceManagerFactory, (RestEndpointFactory)SessionRestEndpointFactory.INSTANCE);
        }

        protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(Configuration configuration, ScheduledExecutor scheduledExecutor) {
            return new MemoryExecutionGraphInfoStore();
        }

        protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor, RpcSystemUtils rpcSystemUtils) {
            return this.haService;
        }

        protected boolean supportsReactiveMode() {
            return false;
        }

        public static final class Builder {
            private HighAvailabilityServices haService = new TestingHighAvailabilityServicesBuilder().build();
            private ResourceManagerFactory<ResourceID> resourceManagerFactory = StandaloneResourceManagerFactory.getInstance();
            private DispatcherRunnerFactory dispatcherRunnerFactory = new TestingDispatcherRunnerFactory.Builder().build();
            private Configuration configuration = new Configuration();

            public Builder setHighAvailabilityServices(HighAvailabilityServices haService) {
                this.haService = haService;
                return this;
            }

            public Builder setResourceManagerFactory(ResourceManagerFactory<ResourceID> resourceManagerFactory) {
                this.resourceManagerFactory = resourceManagerFactory;
                return this;
            }

            public Builder setConfiguration(Configuration configuration) {
                this.configuration = configuration;
                return this;
            }

            public Builder setDispatcherRunnerFactory(DispatcherRunnerFactory dispatcherRunnerFactory) {
                this.dispatcherRunnerFactory = dispatcherRunnerFactory;
                return this;
            }

            public TestingEntryPoint build() {
                return new TestingEntryPoint(this.configuration, this.haService, this.resourceManagerFactory, this.dispatcherRunnerFactory);
            }
        }
    }
}

