/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn.cli;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.DynamicPropertiesUtil;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.cli.AbstractYarnCli;
import org.apache.flink.yarn.cli.YarnApplicationStatusMonitor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkYarnSessionCli
extends AbstractYarnCli {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
    private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
    private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
    private static final String YARN_APPLICATION_ID_KEY = "applicationID";
    private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
    private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
    private static final String YARN_SESSION_HELP = "Available commands:\nhelp - show these commands\nstop - stop the YARN session";
    private final Option query;
    private final Option queue;
    private final Option shipPath;
    private final Option flinkJar;
    private final Option jmMemory;
    private final Option tmMemory;
    private final Option slots;
    private final Option zookeeperNamespace;
    private final Option nodeLabel;
    private final Option help;
    private final Option name;
    private final Option applicationType;
    private final Options allOptions;
    private final Option dynamicproperties;
    private final boolean acceptInteractiveInput;
    private final String configurationDirectory;
    private final Properties yarnPropertiesFile;
    private final ApplicationId yarnApplicationIdFromYarnProperties;
    private final String yarnPropertiesFileLocation;
    private final ClusterClientServiceLoader clusterClientServiceLoader;
    @Nullable
    private String dynamicPropertiesEncoded = null;

    public FlinkYarnSessionCli(Configuration configuration, String configurationDirectory, String shortPrefix, String longPrefix) throws FlinkException {
        this(configuration, (ClusterClientServiceLoader)new DefaultClusterClientServiceLoader(), configurationDirectory, shortPrefix, longPrefix, true);
    }

    public FlinkYarnSessionCli(Configuration configuration, String configurationDirectory, String shortPrefix, String longPrefix, boolean acceptInteractiveInput) throws FlinkException {
        this(configuration, (ClusterClientServiceLoader)new DefaultClusterClientServiceLoader(), configurationDirectory, shortPrefix, longPrefix, acceptInteractiveInput);
    }

    public FlinkYarnSessionCli(Configuration configuration, ClusterClientServiceLoader clusterClientServiceLoader, String configurationDirectory, String shortPrefix, String longPrefix, boolean acceptInteractiveInput) throws FlinkException {
        super(configuration, shortPrefix, longPrefix);
        this.clusterClientServiceLoader = (ClusterClientServiceLoader)Preconditions.checkNotNull((Object)clusterClientServiceLoader);
        this.configurationDirectory = (String)Preconditions.checkNotNull((Object)configurationDirectory);
        this.acceptInteractiveInput = acceptInteractiveInput;
        this.query = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
        this.queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
        this.shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
        this.flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
        this.jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container with optional unit (default: MB)");
        this.tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container with optional unit (default: MB)");
        this.slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
        this.dynamicproperties = Option.builder((String)(shortPrefix + "D")).argName("property=value").numberOfArgs(2).valueSeparator().desc("use value for given property").build();
        this.name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
        this.applicationType = new Option(shortPrefix + "at", longPrefix + "applicationType", true, "Set a custom application type for the application on YARN");
        this.zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
        this.nodeLabel = new Option(shortPrefix + "nl", longPrefix + "nodeLabel", true, "Specify YARN node label for the YARN application");
        this.help = new Option(shortPrefix + "h", longPrefix + "help", false, "Help for the Yarn session CLI.");
        this.allOptions = new Options();
        this.allOptions.addOption(this.flinkJar);
        this.allOptions.addOption(this.jmMemory);
        this.allOptions.addOption(this.tmMemory);
        this.allOptions.addOption(this.queue);
        this.allOptions.addOption(this.query);
        this.allOptions.addOption(this.shipPath);
        this.allOptions.addOption(this.slots);
        this.allOptions.addOption(this.dynamicproperties);
        this.allOptions.addOption(CliFrontendParser.DETACHED_OPTION);
        this.allOptions.addOption(CliFrontendParser.YARN_DETACHED_OPTION);
        this.allOptions.addOption(this.name);
        this.allOptions.addOption(this.applicationId);
        this.allOptions.addOption(this.applicationType);
        this.allOptions.addOption(this.zookeeperNamespace);
        this.allOptions.addOption(this.nodeLabel);
        this.allOptions.addOption(this.help);
        this.yarnPropertiesFileLocation = (String)configuration.get(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
        File yarnPropertiesLocation = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
        this.yarnPropertiesFile = new Properties();
        if (yarnPropertiesLocation.exists()) {
            LOG.info("Found Yarn properties file under {}.", (Object)yarnPropertiesLocation.getAbsolutePath());
            try (FileInputStream is = new FileInputStream(yarnPropertiesLocation);){
                this.yarnPropertiesFile.load(is);
            }
            catch (IOException ioe) {
                throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation + ". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath() + '.', (Throwable)ioe);
            }
            String yarnApplicationIdString = this.yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
            if (yarnApplicationIdString == null) {
                throw new FlinkException("Yarn properties file found but doesn't contain a Yarn application id. Please delete the file at " + yarnPropertiesLocation.getAbsolutePath());
            }
            try {
                this.yarnApplicationIdFromYarnProperties = ApplicationId.fromString((String)yarnApplicationIdString);
            }
            catch (Exception e) {
                throw new FlinkException("YARN properties contain an invalid entry for application id: " + yarnApplicationIdString + ". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath(), (Throwable)e);
            }
        }
        this.yarnApplicationIdFromYarnProperties = null;
    }

    private Path getLocalFlinkDistPathFromCmd(CommandLine cmd) {
        String flinkJarOptionName = this.flinkJar.getOpt();
        if (!cmd.hasOption(flinkJarOptionName)) {
            return null;
        }
        String userPath = cmd.getOptionValue(flinkJarOptionName);
        if (!userPath.startsWith("file://")) {
            userPath = "file://" + userPath;
        }
        return new Path(userPath);
    }

    private void encodeFilesToShipToCluster(Configuration configuration, CommandLine cmd) throws ConfigurationException {
        Preconditions.checkNotNull((Object)cmd);
        Preconditions.checkNotNull((Object)configuration);
        if (cmd.hasOption(this.shipPath.getOpt())) {
            Object[] shipFiles;
            for (String string : shipFiles = cmd.getOptionValues(this.shipPath.getOpt())) {
                File shipFile = new File(string);
                if (shipFile.exists()) continue;
                throw new ConfigurationException("Ship file " + string + " does not exist");
            }
            ConfigUtils.encodeArrayToConfig((WritableConfig)configuration, YarnConfigOptions.SHIP_FILES, (Object[])shipFiles, f -> f);
        }
    }

    @Override
    public boolean isActive(CommandLine commandLine) {
        if (!super.isActive(commandLine)) {
            return this.isYarnPropertiesFileMode(commandLine) && this.yarnApplicationIdFromYarnProperties != null;
        }
        return true;
    }

    public void addRunOptions(Options baseOptions) {
        super.addRunOptions(baseOptions);
        for (Object option : this.allOptions.getOptions()) {
            baseOptions.addOption((Option)option);
        }
    }

    public Configuration toConfiguration(CommandLine commandLine) throws FlinkException {
        Configuration effectiveConfiguration = new Configuration();
        this.applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
        ApplicationId applicationId = this.getApplicationId(commandLine);
        if (applicationId != null) {
            String zooKeeperNamespace = commandLine.hasOption(this.zookeeperNamespace.getOpt()) ? commandLine.getOptionValue(this.zookeeperNamespace.getOpt()) : (String)effectiveConfiguration.get(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)applicationId.toString());
            effectiveConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)zooKeeperNamespace);
            effectiveConfiguration.set(YarnConfigOptions.APPLICATION_ID, (Object)applicationId.toString());
            effectiveConfiguration.set(DeploymentOptions.TARGET, (Object)YarnSessionClusterExecutor.NAME);
        } else {
            effectiveConfiguration.set(DeploymentOptions.TARGET, (Object)YarnJobClusterExecutor.NAME);
        }
        if (commandLine.hasOption(this.jmMemory.getOpt())) {
            String jmMemoryVal = commandLine.getOptionValue(this.jmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit((String)jmMemoryVal)) {
                jmMemoryVal = jmMemoryVal + "m";
            }
            effectiveConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.parse((String)jmMemoryVal));
        }
        if (commandLine.hasOption(this.tmMemory.getOpt())) {
            String tmMemoryVal = commandLine.getOptionValue(this.tmMemory.getOpt());
            if (!MemorySize.MemoryUnit.hasUnit((String)tmMemoryVal)) {
                tmMemoryVal = tmMemoryVal + "m";
            }
            effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, (Object)MemorySize.parse((String)tmMemoryVal));
        }
        if (commandLine.hasOption(this.slots.getOpt())) {
            effectiveConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)Integer.parseInt(commandLine.getOptionValue(this.slots.getOpt())));
        }
        this.dynamicPropertiesEncoded = this.encodeDynamicProperties(commandLine);
        if (!this.dynamicPropertiesEncoded.isEmpty()) {
            Map<String, String> dynProperties = FlinkYarnSessionCli.getDynamicProperties(this.dynamicPropertiesEncoded);
            for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
                effectiveConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
            }
        }
        if (this.isYarnPropertiesFileMode(commandLine)) {
            return this.applyYarnProperties(effectiveConfiguration);
        }
        return effectiveConfiguration;
    }

    private ApplicationId getApplicationId(CommandLine commandLine) {
        if (commandLine.hasOption(this.applicationId.getOpt())) {
            return ApplicationId.fromString((String)commandLine.getOptionValue(this.applicationId.getOpt()));
        }
        if (this.configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent()) {
            return ApplicationId.fromString((String)((String)this.configuration.get(YarnConfigOptions.APPLICATION_ID)));
        }
        if (this.isYarnPropertiesFileMode(commandLine)) {
            return this.yarnApplicationIdFromYarnProperties;
        }
        return null;
    }

    private void applyDescriptorOptionToConfig(CommandLine commandLine, Configuration configuration) throws ConfigurationException {
        String zookeeperNamespaceValue;
        Preconditions.checkNotNull((Object)commandLine);
        Preconditions.checkNotNull((Object)configuration);
        Path localJarPath = this.getLocalFlinkDistPathFromCmd(commandLine);
        if (localJarPath != null) {
            configuration.set(YarnConfigOptions.FLINK_DIST_JAR, (Object)localJarPath.toString());
        }
        this.encodeFilesToShipToCluster(configuration, commandLine);
        if (commandLine.hasOption(this.queue.getOpt())) {
            String queueName = commandLine.getOptionValue(this.queue.getOpt());
            configuration.set(YarnConfigOptions.APPLICATION_QUEUE, (Object)queueName);
        }
        boolean detached = commandLine.hasOption(CliFrontendParser.YARN_DETACHED_OPTION.getOpt()) || commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
        configuration.set(DeploymentOptions.ATTACHED, (Object)(!detached ? 1 : 0));
        if (commandLine.hasOption(this.name.getOpt())) {
            String appName = commandLine.getOptionValue(this.name.getOpt());
            configuration.set(YarnConfigOptions.APPLICATION_NAME, (Object)appName);
        }
        if (commandLine.hasOption(this.applicationType.getOpt())) {
            String appType = commandLine.getOptionValue(this.applicationType.getOpt());
            configuration.set(YarnConfigOptions.APPLICATION_TYPE, (Object)appType);
        }
        if (commandLine.hasOption(this.zookeeperNamespace.getOpt())) {
            zookeeperNamespaceValue = commandLine.getOptionValue(this.zookeeperNamespace.getOpt());
            configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)zookeeperNamespaceValue);
        } else if (commandLine.hasOption(this.zookeeperNamespaceOption.getOpt())) {
            zookeeperNamespaceValue = commandLine.getOptionValue(this.zookeeperNamespaceOption.getOpt());
            configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)zookeeperNamespaceValue);
        }
        if (commandLine.hasOption(this.nodeLabel.getOpt())) {
            String nodeLabelValue = commandLine.getOptionValue(this.nodeLabel.getOpt());
            configuration.set(YarnConfigOptions.NODE_LABEL, (Object)nodeLabelValue);
        }
        configuration.set(DeploymentOptionsInternal.CONF_DIR, (Object)this.configurationDirectory);
    }

    private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
        boolean canApplyYarnProperties;
        boolean bl = canApplyYarnProperties = !commandLine.hasOption(this.addressOption.getOpt());
        if (canApplyYarnProperties) {
            for (Option option : commandLine.getOptions()) {
                if (!this.allOptions.hasOption(option.getOpt()) || this.isDetachedOption(option)) continue;
                canApplyYarnProperties = false;
                break;
            }
        }
        return canApplyYarnProperties;
    }

    private boolean isDetachedOption(Option option) {
        return option.getOpt().equals(CliFrontendParser.YARN_DETACHED_OPTION.getOpt()) || option.getOpt().equals(CliFrontendParser.DETACHED_OPTION.getOpt());
    }

    private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
        Configuration effectiveConfiguration = new Configuration(configuration);
        String applicationId = this.yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
        if (applicationId != null) {
            effectiveConfiguration.set(YarnConfigOptions.APPLICATION_ID, (Object)applicationId);
        }
        String dynamicPropertiesEncoded = this.yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
        Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(dynamicPropertiesEncoded);
        for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
            effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
        }
        return effectiveConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args) throws CliArgsException, FlinkException {
        block25: {
            CommandLine cmd = this.parseCommandLineOptions(args, true);
            if (cmd.hasOption(this.help.getOpt())) {
                this.printUsage();
                return 0;
            }
            Configuration effectiveConfiguration = new Configuration(this.configuration);
            Configuration commandLineConfiguration = this.toConfiguration(cmd);
            effectiveConfiguration.addAll(commandLineConfiguration);
            LOG.debug("Effective configuration: {}", (Object)effectiveConfiguration);
            ClusterClientFactory yarnClusterClientFactory = this.clusterClientServiceLoader.getClusterClientFactory(effectiveConfiguration);
            effectiveConfiguration.set(DeploymentOptions.TARGET, (Object)YarnDeploymentTarget.SESSION.getName());
            YarnClusterDescriptor yarnClusterDescriptor = (YarnClusterDescriptor)yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
            try {
                ClusterClientProvider<ApplicationId> clusterClientProvider;
                ApplicationId yarnApplicationId;
                if (cmd.hasOption(this.query.getOpt())) {
                    String description = yarnClusterDescriptor.getClusterDescription();
                    System.out.println(description);
                    int n = 0;
                    return n;
                }
                if (cmd.hasOption(this.applicationId.getOpt())) {
                    yarnApplicationId = ApplicationId.fromString((String)cmd.getOptionValue(this.applicationId.getOpt()));
                    clusterClientProvider = yarnClusterDescriptor.retrieve(yarnApplicationId);
                } else {
                    ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(effectiveConfiguration);
                    clusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
                    ClusterClient clusterClient = clusterClientProvider.getClusterClient();
                    yarnApplicationId = (ApplicationId)clusterClient.getClusterId();
                    try {
                        this.writeYarnPropertiesFile(yarnApplicationId, this.dynamicPropertiesEncoded);
                        System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
                    }
                    catch (Exception e) {
                        try {
                            clusterClient.close();
                        }
                        catch (Exception ex) {
                            LOG.info("Could not properly shutdown cluster client.", (Throwable)ex);
                        }
                        try {
                            yarnClusterDescriptor.killCluster(yarnApplicationId);
                        }
                        catch (FlinkException fe) {
                            LOG.info("Could not properly terminate the Flink cluster.", (Throwable)fe);
                        }
                        throw new FlinkException("Could not write the Yarn connection information.", (Throwable)e);
                    }
                }
                if (!((Boolean)effectiveConfiguration.get(DeploymentOptions.ATTACHED)).booleanValue()) {
                    YarnClusterDescriptor.logDetachedClusterInformation(yarnApplicationId, LOG);
                    break block25;
                }
                ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
                YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(yarnClusterDescriptor.getYarnClient(), yarnApplicationId, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(scheduledExecutorService));
                Thread shutdownHook = ShutdownHookUtil.addShutdownHook(() -> this.shutdownCluster((ClusterClient<ApplicationId>)clusterClientProvider.getClusterClient(), scheduledExecutorService, yarnApplicationStatusMonitor), (String)((Object)((Object)this)).getClass().getSimpleName(), (Logger)LOG);
                try {
                    FlinkYarnSessionCli.runInteractiveCli(yarnApplicationStatusMonitor, this.acceptInteractiveInput);
                }
                finally {
                    this.shutdownCluster((ClusterClient<ApplicationId>)clusterClientProvider.getClusterClient(), scheduledExecutorService, yarnApplicationStatusMonitor);
                    if (shutdownHook != null) {
                        ShutdownHookUtil.removeShutdownHook((Thread)shutdownHook, (String)((Object)((Object)this)).getClass().getSimpleName(), (Logger)LOG);
                    }
                    this.tryRetrieveAndLogApplicationReport(yarnClusterDescriptor.getYarnClient(), yarnApplicationId);
                }
            }
            finally {
                try {
                    yarnClusterDescriptor.close();
                }
                catch (Exception e) {
                    LOG.info("Could not properly close the yarn cluster descriptor.", (Throwable)e);
                }
            }
        }
        return 0;
    }

    private void shutdownCluster(ClusterClient<ApplicationId> clusterClient, ScheduledExecutorService scheduledExecutorService, YarnApplicationStatusMonitor yarnApplicationStatusMonitor) {
        try {
            yarnApplicationStatusMonitor.close();
        }
        catch (Exception e) {
            LOG.info("Could not properly close the Yarn application status monitor.", (Throwable)e);
        }
        clusterClient.shutDownCluster();
        try {
            clusterClient.close();
        }
        catch (Exception e) {
            LOG.info("Could not properly shutdown cluster client.", (Throwable)e);
        }
        ExecutorUtils.gracefulShutdown((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{scheduledExecutorService});
        this.deleteYarnPropertiesFile();
    }

    private void tryRetrieveAndLogApplicationReport(YarnClient yarnClient, ApplicationId yarnApplicationId) {
        ApplicationReport applicationReport;
        try {
            applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
        }
        catch (IOException | YarnException e) {
            LOG.info("Could not log the final application report.", e);
            applicationReport = null;
        }
        if (applicationReport != null) {
            this.logApplicationReport(applicationReport);
        }
    }

    private void logApplicationReport(ApplicationReport appReport) {
        LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport.getYarnApplicationState() + " and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
        if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED) {
            LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
            LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve the full application log using this command:" + System.lineSeparator() + "\tyarn logs -applicationId " + appReport.getApplicationId() + System.lineSeparator() + "(It sometimes takes a few seconds until the logs are aggregated)");
        }
    }

    private void deleteYarnPropertiesFile() {
        try {
            File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
            if (propertiesFile.isFile()) {
                if (propertiesFile.delete()) {
                    LOG.info("Deleted Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile());
                } else {
                    LOG.warn("Couldn't delete Yarn properties file at {}", (Object)propertiesFile.getAbsoluteFile());
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while deleting the JobManager address file", (Throwable)e);
        }
    }

    private void writeYarnPropertiesFile(ApplicationId yarnApplicationId, @Nullable String dynamicProperties) {
        File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(this.yarnPropertiesFileLocation);
        Properties yarnProps = new Properties();
        yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
        if (dynamicProperties != null) {
            yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
        }
        FlinkYarnSessionCli.writeYarnProperties(yarnProps, yarnPropertiesFile);
    }

    private String encodeDynamicProperties(CommandLine cmd) {
        Properties properties = cmd.getOptionProperties(this.dynamicproperties.getOpt());
        Object[] dynamicProperties = (String[])properties.stringPropertyNames().stream().flatMap(key -> {
            String value = properties.getProperty((String)key);
            LOG.info("Dynamic Property set: {}={}", key, (Object)(GlobalConfiguration.isSensitive((String)key) ? "******" : value));
            if (value != null) {
                return Stream.of(key + this.dynamicproperties.getValueSeparator() + value);
            }
            return Stream.empty();
        }).toArray(String[]::new);
        return StringUtils.join((Object[])dynamicProperties, (String)YARN_DYNAMIC_PROPERTIES_SEPARATOR);
    }

    public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
        if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
            String[] propertyLines;
            HashMap<String, String> properties = new HashMap<String, String>();
            for (String propLine : propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR)) {
                int firstEquals;
                if (propLine == null || (firstEquals = propLine.indexOf("=")) < 0) continue;
                String key = propLine.substring(0, firstEquals).trim();
                String value = propLine.substring(firstEquals + 1).trim();
                if (key.isEmpty()) continue;
                properties.put(key, value);
            }
            return properties;
        }
        return Collections.emptyMap();
    }

    public static void main(String[] args) {
        int retCode;
        String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
        try {
            FlinkYarnSessionCli cli = new FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "", "");
            CommandLine commandLine = CliFrontendParser.parse((Options)cli.allOptions, (String[])args, (boolean)true);
            Configuration securityFlinkConfiguration = flinkConfiguration.clone();
            DynamicPropertiesUtil.encodeDynamicProperties((CommandLine)commandLine, (Configuration)securityFlinkConfiguration);
            SecurityUtils.install((SecurityConfiguration)new SecurityConfiguration(securityFlinkConfiguration));
            retCode = (Integer)SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
        }
        catch (CliArgsException e) {
            retCode = FlinkYarnSessionCli.handleCliArgsException((CliArgsException)e, (Logger)LOG);
        }
        catch (Throwable t) {
            Throwable strippedThrowable = ExceptionUtils.stripException((Throwable)t, UndeclaredThrowableException.class);
            retCode = FlinkYarnSessionCli.handleError((Throwable)strippedThrowable, (Logger)LOG);
        }
        System.exit(retCode);
    }

    private static void runInteractiveCli(YarnApplicationStatusMonitor yarnApplicationStatusMonitor, boolean readConsoleInput) {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in));){
            boolean continueRepl = true;
            boolean isLastStatusUnknown = true;
            long unknownStatusSince = System.nanoTime();
            while (continueRepl) {
                ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();
                switch (applicationStatus) {
                    case FAILED: 
                    case CANCELED: {
                        System.err.println("The Flink Yarn cluster has failed.");
                        continueRepl = false;
                        break;
                    }
                    case UNKNOWN: {
                        if (!isLastStatusUnknown) {
                            unknownStatusSince = System.nanoTime();
                            isLastStatusUnknown = true;
                        }
                        if (System.nanoTime() - unknownStatusSince > 15000000000L) {
                            System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
                            continueRepl = false;
                            break;
                        }
                        continueRepl = FlinkYarnSessionCli.repStep(in, readConsoleInput);
                        break;
                    }
                    case SUCCEEDED: {
                        if (isLastStatusUnknown) {
                            isLastStatusUnknown = false;
                        }
                        continueRepl = FlinkYarnSessionCli.repStep(in, readConsoleInput);
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while running the interactive command line interface.", (Throwable)e);
        }
    }

    private static boolean repStep(BufferedReader in, boolean readConsoleInput) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();
        while (!(System.currentTimeMillis() - startTime >= 3000L || readConsoleInput && in.ready())) {
            Thread.sleep(200L);
        }
        if (readConsoleInput && in.ready()) {
            String command;
            switch (command = in.readLine()) {
                case "quit": 
                case "stop": {
                    return false;
                }
                case "help": {
                    System.err.println(YARN_SESSION_HELP);
                    break;
                }
                default: {
                    System.err.println("Unknown command '" + command + "'. Showing help:");
                    System.err.println(YARN_SESSION_HELP);
                }
            }
        }
        return true;
    }

    private static void writeYarnProperties(Properties properties, File propertiesFile) {
        try (FileOutputStream out = new FileOutputStream(propertiesFile);){
            properties.store(out, "Generated YARN properties file");
        }
        catch (IOException e) {
            throw new RuntimeException("Error writing the properties file", e);
        }
        propertiesFile.setReadable(true, false);
    }

    public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) {
        String propertiesFileLocation = yarnPropertiesFileLocation != null ? yarnPropertiesFileLocation : System.getProperty("java.io.tmpdir");
        String currentUser = System.getProperty("user.name");
        return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
    }
}

