/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.VoidMetricQueryServiceRetriever;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ProcessFailureCancelingITCase
extends TestLogger {
    private static final String TASK_DEPLOYED_MARKER = "deployed";
    private static final Duration TIMEOUT = Duration.ofMinutes(2L);
    @Rule
    public final BlobServerResource blobServerResource = new BlobServerResource();
    @Rule
    public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testCancelingOnProcessFailure() throws Throwable {
        Assume.assumeTrue((String)"---- Skipping Process Failure test : Could not find java executable ----", (CommonTestUtils.getJavaCommandPath() != null ? 1 : 0) != 0);
        TestProcessBuilder.TestProcess taskManagerProcess = null;
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        final Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, "localhost");
        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofSeconds(100L));
        config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperResource.getConnectString());
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"4m"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, (Object)MemorySize.parse((String)"128m"));
        config.set(TaskManagerOptions.CPU_CORES, (Object)1.0);
        config.setInteger(RestOptions.PORT, 0);
        RpcService rpcService = RpcSystem.load().remoteServiceBuilder(config, "localhost", "0").createAndStart();
        int jobManagerPort = rpcService.getPort();
        config.setInteger(JobManagerOptions.PORT, jobManagerPort);
        DefaultDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory((ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance());
        DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null;
        ScheduledExecutorService ioExecutor = TestingUtils.defaultExecutor();
        HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)ioExecutor, (AddressResolution)AddressResolution.NO_ADDRESS_RESOLUTION, (RpcSystemUtils)RpcSystem.load(), (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
        final AtomicReference programException = new AtomicReference();
        try {
            dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(config, (Executor)ioExecutor, rpcService, haServices, this.blobServerResource.getBlobServer(), new HeartbeatServices(100L, 10000L, 2), NoOpMetricRegistry.INSTANCE, (ExecutionGraphInfoStore)new MemoryExecutionGraphInfoStore(), (MetricQueryServiceRetriever)VoidMetricQueryServiceRetriever.INSTANCE, (FatalErrorHandler)fatalErrorHandler);
            TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());
            taskManagerProcessBuilder.addConfigAsMainClassArgs(config);
            taskManagerProcess = taskManagerProcessBuilder.start();
            Runnable programRunner = new Runnable(){

                @Override
                public void run() {
                    try {
                        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)1337, (Configuration)config, (String[])new String[0]);
                        env.setParallelism(2);
                        env.setRestartStrategy(RestartStrategies.noRestart());
                        env.generateSequence(0L, Long.MAX_VALUE).map((MapFunction)new MapFunction<Long, Long>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public Long map(Long value) throws Exception {
                                1 var2_2 = this;
                                synchronized (var2_2) {
                                    System.out.println(ProcessFailureCancelingITCase.TASK_DEPLOYED_MARKER);
                                    this.wait();
                                }
                                return 0L;
                            }
                        }).output((OutputFormat)new DiscardingOutputFormat());
                        env.execute();
                    }
                    catch (Throwable t) {
                        programException.set(t);
                    }
                }
            };
            Thread programThread = new Thread(programRunner);
            programThread.start();
            ProcessFailureCancelingITCase.waitUntilAtLeastOneTaskHasBeenDeployed(taskManagerProcess);
            taskManagerProcess.destroy();
            taskManagerProcess = null;
            programThread.join(TIMEOUT.toMillis());
            Assert.assertFalse((String)"The program did not cancel in time", (boolean)programThread.isAlive());
            Throwable error = (Throwable)programException.get();
            Assert.assertNotNull((String)"The program did not fail properly", (Object)error);
            Assert.assertTrue((boolean)(error instanceof ProgramInvocationException));
        }
        catch (Error | Exception e) {
            if (taskManagerProcess != null) {
                ProcessFailureCancelingITCase.printOutput("TaskManager OUT", taskManagerProcess.getProcessOutput().toString());
                ProcessFailureCancelingITCase.printOutput("TaskManager ERR", taskManagerProcess.getErrorOutput().toString());
            }
            throw ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)((Throwable)programException.get()));
        }
        finally {
            if (taskManagerProcess != null) {
                taskManagerProcess.destroy();
            }
            if (dispatcherResourceManagerComponent != null) {
                dispatcherResourceManagerComponent.stopApplication(ApplicationStatus.SUCCEEDED, null);
            }
            fatalErrorHandler.rethrowError();
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)Time.seconds((long)100L));
            haServices.closeAndCleanupAllData();
        }
    }

    private static void waitUntilAtLeastOneTaskHasBeenDeployed(TestProcessBuilder.TestProcess taskManagerProcess) throws InterruptedException, TimeoutException {
        org.apache.flink.core.testutils.CommonTestUtils.waitUtil(() -> taskManagerProcess.getProcessOutput().toString().contains(TASK_DEPLOYED_MARKER), (Duration)Duration.ofMinutes(2L), null);
    }

    private static void printOutput(String processName, String logContents) {
        if (logContents == null || logContents.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
        System.out.println("-----------------------------------------");
        System.out.println(logContents);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}

