/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginConfig;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.security.token.DefaultDelegationTokenManager;
import org.apache.flink.runtime.security.token.DelegationTokenContainer;
import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnApplicationFileUploader;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnLocalResourceDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnClusterDescriptor
implements ClusterDescriptor<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
    @VisibleForTesting
    static final String IGNORE_UNRECOGNIZED_VM_OPTIONS = "-XX:+IgnoreUnrecognizedVMOptions";
    private final YarnConfiguration yarnConfiguration;
    private final YarnClient yarnClient;
    private final YarnClusterInformationRetriever yarnClusterInformationRetriever;
    private final boolean sharedYarnClient;
    private final List<Path> shipFiles = new LinkedList<Path>();
    private final List<Path> shipArchives = new LinkedList<Path>();
    private final String yarnQueue;
    private Path flinkJarPath;
    private final Configuration flinkConfiguration;
    private final String customName;
    private final String nodeLabel;
    private final String applicationType;
    private YarnConfigOptions.UserJarInclusion userJarInclusion;

    public YarnClusterDescriptor(Configuration flinkConfiguration, YarnConfiguration yarnConfiguration, YarnClient yarnClient, YarnClusterInformationRetriever yarnClusterInformationRetriever, boolean sharedYarnClient) {
        this.yarnConfiguration = (YarnConfiguration)Preconditions.checkNotNull((Object)yarnConfiguration);
        this.yarnClient = (YarnClient)Preconditions.checkNotNull((Object)yarnClient);
        this.yarnClusterInformationRetriever = (YarnClusterInformationRetriever)Preconditions.checkNotNull((Object)yarnClusterInformationRetriever);
        this.sharedYarnClient = sharedYarnClient;
        this.flinkConfiguration = (Configuration)Preconditions.checkNotNull((Object)flinkConfiguration);
        this.userJarInclusion = YarnClusterDescriptor.getUserJarInclusionMode(flinkConfiguration);
        YarnClusterDescriptor.adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_LEVEL, "ROOT_LOG_LEVEL");
        YarnClusterDescriptor.adaptEnvSetting(flinkConfiguration, CoreOptions.FLINK_LOG_MAX, "MAX_LOG_FILE_NUMBER");
        this.getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
        this.decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES).ifPresent(this::addShipFiles);
        this.decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES).ifPresent(this::addShipArchives);
        this.yarnQueue = (String)flinkConfiguration.get(YarnConfigOptions.APPLICATION_QUEUE);
        this.customName = (String)flinkConfiguration.get(YarnConfigOptions.APPLICATION_NAME);
        this.applicationType = (String)flinkConfiguration.get(YarnConfigOptions.APPLICATION_TYPE);
        this.nodeLabel = (String)flinkConfiguration.get(YarnConfigOptions.NODE_LABEL);
    }

    private static <T> void adaptEnvSetting(Configuration config, ConfigOption<T> configOption, String envKey) {
        config.getOptional(configOption).ifPresent(value -> {
            config.setString("containerized.master.env." + envKey, String.valueOf(value));
            config.setString("containerized.taskmanager.env." + envKey, String.valueOf(value));
        });
    }

    private Optional<List<Path>> decodeFilesToShipToCluster(Configuration configuration, ConfigOption<List<String>> configOption) {
        Preconditions.checkNotNull((Object)configuration);
        Preconditions.checkNotNull(configOption);
        List files = ConfigUtils.decodeListFromConfig((ReadableConfig)configuration, configOption, this::createPathWithSchema);
        return files.isEmpty() ? Optional.empty() : Optional.of(files);
    }

    private Path createPathWithSchema(String path) {
        return this.isWithoutSchema(new Path(path)) ? Utils.getPathFromLocalFilePathStr(path) : new Path(path);
    }

    private boolean isWithoutSchema(Path path) {
        return StringUtils.isNullOrWhitespaceOnly((String)path.toUri().getScheme());
    }

    private Optional<Path> getLocalFlinkDistPath(Configuration configuration) {
        String localJarPath = (String)configuration.get(YarnConfigOptions.FLINK_DIST_JAR);
        if (localJarPath != null) {
            return Optional.of(new Path(localJarPath));
        }
        LOG.info("No path for the flink jar passed. Using the location of " + this.getClass() + " to locate the jar");
        String decodedPath = this.getDecodedJarPath();
        return decodedPath.endsWith(".jar") ? Optional.of(Utils.getPathFromLocalFilePathStr(decodedPath)) : Optional.empty();
    }

    private String getDecodedJarPath() {
        String encodedJarPath = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
        try {
            return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Couldn't decode the encoded Flink dist jar path: " + encodedJarPath + " You can supply a path manually via the command line.");
        }
    }

    @VisibleForTesting
    List<Path> getShipFiles() {
        return this.shipFiles;
    }

    @VisibleForTesting
    List<Path> getShipArchives() {
        return this.shipArchives;
    }

    public YarnClient getYarnClient() {
        return this.yarnClient;
    }

    protected String getYarnSessionClusterEntrypoint() {
        return YarnSessionClusterEntrypoint.class.getName();
    }

    protected String getYarnJobClusterEntrypoint() {
        return YarnJobClusterEntrypoint.class.getName();
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfiguration;
    }

    public void setLocalJarPath(Path localJarPath) {
        if (!localJarPath.toString().endsWith("jar")) {
            throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
        }
        this.flinkJarPath = localJarPath;
    }

    public void addShipFiles(List<Path> shipFiles) {
        Preconditions.checkArgument((!YarnClusterDescriptor.isUsrLibDirIncludedInShipFiles(shipFiles, this.yarnConfiguration) ? 1 : 0) != 0, (String)"User-shipped directories configured via : %s should not include %s.", (Object[])new Object[]{YarnConfigOptions.SHIP_FILES.key(), "usrlib"});
        this.shipFiles.addAll(shipFiles);
    }

    private void addShipArchives(List<Path> shipArchives) {
        Preconditions.checkArgument((boolean)YarnClusterDescriptor.isArchiveOnlyIncludedInShipArchiveFiles(shipArchives, this.yarnConfiguration), (Object)"Directories or non-archive files are included.");
        this.shipArchives.addAll(shipArchives);
    }

    private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<Path> shipFiles, YarnConfiguration yarnConfiguration) {
        long archivedFileCount = shipFiles.stream().map(FunctionUtils.uncheckedFunction(path -> YarnClusterDescriptor.getFileStatus(path, yarnConfiguration))).filter(FileStatus::isFile).map(status -> status.getPath().getName().toLowerCase()).filter(name -> name.endsWith(".tar.gz") || name.endsWith(".tar") || name.endsWith(".tgz") || name.endsWith(".dst") || name.endsWith(".jar") || name.endsWith(".zip")).count();
        return archivedFileCount == (long)shipFiles.size();
    }

    private static FileStatus getFileStatus(Path path, YarnConfiguration yarnConfiguration) throws IOException {
        return path.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfiguration).getFileStatus(path);
    }

    private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.flinkConfiguration == null) {
            throw new YarnDeploymentException("Flink configuration object has not been set");
        }
        int numYarnMaxVcores = this.yarnClusterInformationRetriever.getMaxVcores();
        int configuredAmVcores = (Integer)this.flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES);
        if (configuredAmVcores > numYarnMaxVcores) {
            throw new IllegalConfigurationException(String.format("The number of requested virtual cores for application master %d exceeds the maximum number of virtual cores %d available in the Yarn Cluster.", configuredAmVcores, numYarnMaxVcores));
        }
        int configuredVcores = (Integer)this.flinkConfiguration.get(YarnConfigOptions.VCORES, (Object)clusterSpecification.getSlotsPerTaskManager());
        if (configuredVcores > numYarnMaxVcores) {
            throw new IllegalConfigurationException(String.format("The number of requested virtual cores per node %d exceeds the maximum number of virtual cores %d available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with '%s.'", configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    public String getNodeLabel() {
        return this.nodeLabel;
    }

    public void close() {
        if (!this.sharedYarnClient) {
            this.yarnClient.stop();
        }
    }

    public ClusterClientProvider<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
        try {
            ApplicationReport report;
            if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
                LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
            }
            if ((report = this.yarnClient.getApplicationReport(applicationId)).getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", (Object)applicationId, (Object)report.getFinalApplicationStatus());
                throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
            }
            this.setClusterEntrypointInfoToConfig(report);
            return () -> {
                try {
                    return new RestClusterClient(this.flinkConfiguration, (Object)report.getApplicationId());
                }
                catch (Exception e) {
                    throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
                }
            };
        }
        catch (Exception e) {
            throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", (Throwable)e);
        }
    }

    public ClusterClientProvider<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
        try {
            return this.deployInternal(clusterSpecification, "Flink session cluster", this.getYarnSessionClusterEntrypoint(), null, false);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", (Throwable)e);
        }
    }

    public ClusterClientProvider<ApplicationId> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
        Preconditions.checkNotNull((Object)clusterSpecification);
        Preconditions.checkNotNull((Object)applicationConfiguration);
        YarnDeploymentTarget deploymentTarget = YarnDeploymentTarget.fromConfig(this.flinkConfiguration);
        if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster. Expected deployment.target=" + YarnDeploymentTarget.APPLICATION.getName() + " but actual one was \"" + deploymentTarget.getName() + "\"");
        }
        applicationConfiguration.applyToConfiguration(this.flinkConfiguration);
        if (!PackagedProgramUtils.isPython((String)applicationConfiguration.getApplicationClassName()).booleanValue() && !PackagedProgramUtils.isPython((String[])applicationConfiguration.getProgramArguments())) {
            List pipelineJars = this.flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
            Preconditions.checkArgument((pipelineJars.size() == 1 ? 1 : 0) != 0, (Object)"Should only have one jar");
        }
        try {
            return this.deployInternal(clusterSpecification, "Flink Application Cluster", YarnApplicationClusterEntryPoint.class.getName(), null, false);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", (Throwable)e);
        }
    }

    public ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException {
        LOG.warn("Job Clusters are deprecated since Flink 1.15. Please use an Application Cluster/Application Mode instead.");
        try {
            return this.deployInternal(clusterSpecification, "Flink per-job cluster", this.getYarnJobClusterEntrypoint(), jobGraph, detached);
        }
        catch (Exception e) {
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", (Throwable)e);
        }
    }

    public void killCluster(ApplicationId applicationId) throws FlinkException {
        try {
            this.yarnClient.killApplication(applicationId);
            try (FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);){
                Path applicationDir = YarnApplicationFileUploader.getApplicationDirPath(this.getStagingDir(fs), applicationId);
                Utils.deleteApplicationFiles(applicationDir.toUri().toString());
            }
        }
        catch (IOException | YarnException e) {
            throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
        }
    }

    private ClusterClientProvider<ApplicationId> deployInternal(ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception {
        ClusterSpecification validClusterSpecification;
        ClusterResourceDescription freeClusterMem;
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        if (HadoopUtils.isKerberosSecurityEnabled((UserGroupInformation)currentUser)) {
            boolean yarnAccessFSEnabled;
            boolean useTicketCache = (Boolean)this.flinkConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
            if (!HadoopUtils.areKerberosCredentialsValid((UserGroupInformation)currentUser, (boolean)useTicketCache)) {
                throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials or delegation tokens!");
            }
            boolean fetchToken = (Boolean)this.flinkConfiguration.get(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
            boolean bl = yarnAccessFSEnabled = !CollectionUtil.isNullOrEmpty((Collection)((Collection)this.flinkConfiguration.get(SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS)));
            if (!fetchToken && yarnAccessFSEnabled) {
                throw new IllegalConfigurationException(String.format("When %s is disabled, %s must be disabled as well.", SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(), SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS.key()));
            }
        }
        this.isReadyForDeployment(clusterSpecification);
        this.checkYarnQueues(this.yarnClient);
        YarnClientApplication yarnApplication = this.yarnClient.createApplication();
        GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
        Resource maxRes = appResponse.getMaximumResourceCapability();
        try {
            freeClusterMem = this.getCurrentFreeClusterResources(this.yarnClient);
        }
        catch (IOException | YarnException e) {
            this.failSessionDuringDeployment(this.yarnClient, yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }
        int yarnMinAllocationMB = this.yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 1024);
        if (yarnMinAllocationMB <= 0) {
            throw new YarnDeploymentException("The minimum allocation memory (" + yarnMinAllocationMB + " MB) configured via '" + "yarn.scheduler.minimum-allocation-mb" + "' should be greater than 0.");
        }
        try {
            validClusterSpecification = this.validateClusterResources(clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
        }
        catch (YarnDeploymentException yde) {
            this.failSessionDuringDeployment(this.yarnClient, yarnApplication);
            throw yde;
        }
        LOG.info("Cluster specification: {}", (Object)validClusterSpecification);
        ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL;
        this.flinkConfiguration.set(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, (Object)executionMode.toString());
        ApplicationReport report = this.startAppMaster(this.flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, this.yarnClient, yarnApplication, validClusterSpecification);
        if (detached) {
            ApplicationId yarnApplicationId = report.getApplicationId();
            YarnClusterDescriptor.logDetachedClusterInformation(yarnApplicationId, LOG);
        }
        this.setClusterEntrypointInfoToConfig(report);
        return () -> {
            try {
                return new RestClusterClient(this.flinkConfiguration, (Object)report.getApplicationId());
            }
            catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }

    private ClusterSpecification validateClusterResources(ClusterSpecification clusterSpecification, int yarnMinAllocationMB, Resource maximumResourceCapability, ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
        int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
        int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
        this.logIfComponentMemNotIntegerMultipleOfYarnMinAllocation("JobManager", jobManagerMemoryMb, yarnMinAllocationMB);
        this.logIfComponentMemNotIntegerMultipleOfYarnMinAllocation("TaskManager", taskManagerMemoryMb, yarnMinAllocationMB);
        if (jobManagerMemoryMb < yarnMinAllocationMB) {
            jobManagerMemoryMb = yarnMinAllocationMB;
        }
        String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
        if ((long)jobManagerMemoryMb > maximumResourceCapability.getMemorySize()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maximumResourceCapability.getMemorySize() + "MB Requested: " + jobManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        if ((long)taskManagerMemoryMb > maximumResourceCapability.getMemorySize()) {
            throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maximumResourceCapability.getMemorySize() + " Requested: " + taskManagerMemoryMb + "MB. " + "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
        }
        String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.";
        if ((long)taskManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        if ((long)jobManagerMemoryMb > freeClusterResources.containerLimit) {
            LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than the largest possible YARN container: " + freeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
        }
        return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMb).setTaskManagerMemoryMB(taskManagerMemoryMb).setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager()).createClusterSpecification();
    }

    private void logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(String componentName, int componentMemoryMB, int yarnMinAllocationMB) {
        int normalizedMemMB = (componentMemoryMB + (yarnMinAllocationMB - 1)) / yarnMinAllocationMB * yarnMinAllocationMB;
        if (normalizedMemMB <= 0) {
            normalizedMemMB = yarnMinAllocationMB;
        }
        if (componentMemoryMB != normalizedMemMB) {
            LOG.info("The configured {} memory is {} MB. YARN will allocate {} MB to make up an integer multiple of its minimum allocation memory ({} MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra {} MB may not be used by Flink.", new Object[]{componentName, componentMemoryMB, normalizedMemMB, yarnMinAllocationMB, normalizedMemMB - componentMemoryMB});
        }
    }

    private void checkYarnQueues(YarnClient yarnClient) {
        block6: {
            try {
                List queues = yarnClient.getAllQueues();
                if (queues.size() > 0 && this.yarnQueue != null) {
                    boolean queueFound = false;
                    for (QueueInfo queue : queues) {
                        if (!queue.getQueueName().equals(this.yarnQueue) && !queue.getQueueName().equals("root." + this.yarnQueue)) continue;
                        queueFound = true;
                        break;
                    }
                    if (!queueFound) {
                        String queueNames = StringUtils.toQuotedListString((Object[])queues.toArray());
                        LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. Available queues: " + queueNames);
                    }
                } else {
                    LOG.debug("The YARN cluster does not have any queues configured");
                }
            }
            catch (Throwable e) {
                LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
                if (!LOG.isDebugEnabled()) break block6;
                LOG.debug("Error details", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ApplicationReport startAppMaster(Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception {
        ApplicationReport report;
        List jarUrls;
        org.apache.flink.core.fs.FileSystem.initialize((Configuration)configuration, (PluginManager)PluginUtils.createPluginManagerFromRootFolder((Configuration)configuration));
        FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && fs.getScheme().startsWith("file")) {
            LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values.The Flink YARN client needs to store its files in a distributed file system");
        }
        ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
        List<Path> providedLibDirs = Utils.getQualifiedRemoteProvidedLibDirs(configuration, this.yarnConfiguration);
        Optional<Path> providedUsrLibDir = Utils.getQualifiedRemoteProvidedUsrLib(configuration, this.yarnConfiguration);
        Path stagingDirPath = this.getStagingDir(fs);
        FileSystem stagingDirFs = stagingDirPath.getFileSystem((org.apache.hadoop.conf.Configuration)this.yarnConfiguration);
        YarnApplicationFileUploader fileUploader = YarnApplicationFileUploader.from(stagingDirFs, stagingDirPath, providedLibDirs, appContext.getApplicationId(), this.getFileReplication());
        HashSet<Path> systemShipFiles = new HashSet<Path>(this.shipFiles);
        String logConfigFilePath = (String)configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
        if (logConfigFilePath != null) {
            systemShipFiles.add(Utils.getPathFromLocalFilePathStr(logConfigFilePath));
        }
        ApplicationId appId = appContext.getApplicationId();
        this.setHAClusterIdIfNotSet(configuration, appId);
        if (HighAvailabilityMode.isHighAvailabilityModeActivated((Configuration)configuration)) {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 2));
            this.activateHighAvailabilitySupport(appContext);
        } else {
            appContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
        }
        HashSet<Path> userJarFiles = new HashSet<Path>();
        if (jobGraph != null) {
            userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));
        }
        if ((jarUrls = ConfigUtils.decodeListFromConfig((ReadableConfig)configuration, (ConfigOption)PipelineOptions.JARS, URI::create)) != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
            userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
        }
        if (jobGraph != null) {
            for (Map.Entry entry : jobGraph.getUserArtifacts().entrySet()) {
                if (Utils.isRemotePath(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath)) continue;
                Path localPath = new Path(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath);
                Tuple2<Path, Long> remoteFileInfo = fileUploader.uploadLocalFileToRemote(localPath, (String)entry.getKey());
                jobGraph.setUserArtifactRemotePath((String)entry.getKey(), ((Path)remoteFileInfo.f0).toString());
            }
            jobGraph.writeUserArtifactEntriesToConfiguration();
        }
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            this.addLibFoldersToShipFiles(systemShipFiles);
        }
        List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
        List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources(systemShipFiles, ".", LocalResourceType.FILE);
        systemClassPaths.addAll(uploadedDependencies);
        if (providedLibDirs == null || providedLibDirs.isEmpty()) {
            HashSet<Path> shipOnlyFiles = new HashSet<Path>();
            this.addPluginsFoldersToShipFiles(shipOnlyFiles);
            fileUploader.registerMultipleLocalResources(shipOnlyFiles, ".", LocalResourceType.FILE);
        }
        if (!this.shipArchives.isEmpty()) {
            fileUploader.registerMultipleLocalResources(this.shipArchives, ".", LocalResourceType.ARCHIVE);
        }
        if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint) && PackagedProgramUtils.isPython((String)((String)configuration.get(ApplicationConfiguration.APPLICATION_MAIN_CLASS))).booleanValue()) {
            fileUploader.registerMultipleLocalResources(Collections.singletonList(new Path(PackagedProgramUtils.getPythonJar().toURI())), "opt", LocalResourceType.FILE);
        }
        List<String> userClassPaths = fileUploader.registerMultipleLocalResources(userJarFiles, this.userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? "usrlib" : ".", LocalResourceType.FILE);
        if (providedUsrLibDir.isPresent()) {
            List<String> usrLibClassPaths = fileUploader.registerMultipleLocalResources(Collections.singletonList(providedUsrLibDir.get()), ".", LocalResourceType.FILE);
            userClassPaths.addAll(usrLibClassPaths);
        } else if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
            HashSet<File> usrLibShipFiles = new HashSet<File>();
            this.addUsrLibFolderToShipFiles(usrLibShipFiles);
            Iterator<String> usrLibClassPaths = fileUploader.registerMultipleLocalResources(usrLibShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()), ".", LocalResourceType.FILE);
            userClassPaths.addAll((Collection<String>)((Object)usrLibClassPaths));
        }
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
            systemClassPaths.addAll(userClassPaths);
        }
        Collections.sort(systemClassPaths);
        Collections.sort(userClassPaths);
        StringBuilder classPathBuilder = new StringBuilder();
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        for (String classPath : systemClassPaths) {
            classPathBuilder.append(classPath).append(File.pathSeparator);
        }
        YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(this.flinkJarPath);
        classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator);
        if (jobGraph != null) {
            File tmpJobGraphFile = null;
            try {
                tmpJobGraphFile = File.createTempFile(appId.toString(), null);
                try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
                     ObjectOutputStream obOutput = new ObjectOutputStream(output);){
                    obOutput.writeObject(jobGraph);
                }
                String jobGraphFilename = "job.graph";
                configuration.set(FileJobGraphRetriever.JOB_GRAPH_FILE_PATH, (Object)"job.graph");
                fileUploader.registerSingleLocalResource("job.graph", new Path(tmpJobGraphFile.toURI()), "", LocalResourceType.FILE, true, false);
                classPathBuilder.append("job.graph").append(File.pathSeparator);
            }
            catch (Exception e2) {
                LOG.warn("Add job graph to local resource fail.");
                throw e2;
            }
            finally {
                if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
                    LOG.warn("Fail to delete temporary file {}.", (Object)tmpJobGraphFile.toPath());
                }
            }
        }
        File tmpConfigurationFile = null;
        try {
            String flinkConfigFileName = GlobalConfiguration.getFlinkConfFilename();
            tmpConfigurationFile = File.createTempFile(appId + "-" + (String)flinkConfigFileName, null);
            this.removeLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);
            this.removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);
            configuration.removeConfig(TaskManagerOptions.HOST);
            BootstrapTools.writeConfiguration((Configuration)configuration, (File)tmpConfigurationFile);
            fileUploader.registerSingleLocalResource(flinkConfigFileName, new Path(tmpConfigurationFile.toURI()), "", LocalResourceType.FILE, true, true);
            classPathBuilder.append(flinkConfigFileName).append(File.pathSeparator);
        }
        finally {
            if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
                LOG.warn("Fail to delete temporary file {}.", (Object)tmpConfigurationFile.toPath());
            }
        }
        if (this.userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
            for (String userClassPath : userClassPaths) {
                classPathBuilder.append(userClassPath).append(File.pathSeparator);
            }
        }
        Path remoteYarnSiteXmlPath = null;
        if (System.getenv("IN_TESTS") != null) {
            File f2 = new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml");
            LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", (Object)f2.getAbsolutePath());
            Path yarnSitePath = new Path(f2.getAbsolutePath());
            remoteYarnSiteXmlPath = fileUploader.registerSingleLocalResource("yarn-site.xml", yarnSitePath, "", LocalResourceType.FILE, false, false).getPath();
            if (System.getProperty("java.security.krb5.conf") != null) {
                configuration.set(SecurityOptions.KERBEROS_KRB5_PATH, (Object)System.getProperty("java.security.krb5.conf"));
            }
        }
        Path remoteKrb5Path = null;
        boolean hasKrb5 = false;
        String krb5Config = (String)configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
        if (!StringUtils.isNullOrWhitespaceOnly((String)krb5Config)) {
            File krb5 = new File(krb5Config);
            LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", (Object)krb5.getAbsolutePath());
            Path krb5ConfPath = new Path(krb5.getAbsolutePath());
            remoteKrb5Path = fileUploader.registerSingleLocalResource("krb5.conf", krb5ConfPath, "", LocalResourceType.FILE, false, false).getPath();
            hasKrb5 = true;
        }
        Path remotePathKeytab = null;
        String localizedKeytabPath = null;
        String keytab = (String)configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        if (keytab != null) {
            boolean localizeKeytab = (Boolean)this.flinkConfiguration.get(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
            localizedKeytabPath = (String)this.flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            if (localizeKeytab) {
                LOG.info("Adding keytab {} to the AM container local resource bucket", (Object)keytab);
                remotePathKeytab = fileUploader.registerSingleLocalResource(localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false).getPath();
            } else {
                localizedKeytabPath = (String)this.flinkConfiguration.get(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
            }
        }
        JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap((Configuration)this.flinkConfiguration, (ConfigOption)JobManagerOptions.TOTAL_PROCESS_MEMORY);
        ContainerLaunchContext amContainer = this.setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
        boolean fetchToken = (Boolean)configuration.get(SecurityOptions.DELEGATION_TOKENS_ENABLED);
        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration);
        if (kerberosLoginProvider.isLoginPossible(true)) {
            this.setTokensFor(amContainer, fetchToken);
        } else {
            LOG.info("Cannot use kerberos delegation token manager, no valid kerberos credentials provided.");
        }
        amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
        fileUploader.close();
        Utils.setAclsFor(amContainer, this.flinkConfiguration);
        Map<String, String> appMasterEnv = this.generateApplicationMasterEnv(fileUploader, classPathBuilder.toString(), localResourceDescFlinkJar.toString(), appId.toString());
        if (localizedKeytabPath != null) {
            appMasterEnv.put("_LOCAL_KEYTAB_PATH", localizedKeytabPath);
            String principal = (String)configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
            appMasterEnv.put("_KEYTAB_PRINCIPAL", principal);
            if (remotePathKeytab != null) {
                appMasterEnv.put("_REMOTE_KEYTAB_PATH", remotePathKeytab.toString());
            }
        }
        if (remoteYarnSiteXmlPath != null) {
            appMasterEnv.put("_YARN_SITE_XML_PATH", remoteYarnSiteXmlPath.toString());
        }
        if (remoteKrb5Path != null) {
            appMasterEnv.put("_KRB5_PATH", remoteKrb5Path.toString());
        }
        amContainer.setEnvironment(appMasterEnv);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemorySize((long)clusterSpecification.getMasterMemoryMB());
        capability.setVirtualCores(((Integer)this.flinkConfiguration.get(YarnConfigOptions.APP_MASTER_VCORES)).intValue());
        String customApplicationName = this.customName != null ? this.customName : applicationName;
        appContext.setApplicationName(customApplicationName);
        appContext.setApplicationType(this.applicationType != null ? this.applicationType : "Apache Flink");
        appContext.setAMContainerSpec(amContainer);
        appContext.setResource(capability);
        int priorityNum = (Integer)this.flinkConfiguration.get(YarnConfigOptions.APPLICATION_PRIORITY);
        if (priorityNum >= 0) {
            Priority priority = Priority.newInstance((int)priorityNum);
            appContext.setPriority(priority);
        }
        if (this.yarnQueue != null) {
            appContext.setQueue(this.yarnQueue);
        }
        this.setApplicationNodeLabel(appContext);
        this.setApplicationTags(appContext);
        DeploymentFailureHook deploymentFailureHook = new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);
        LOG.info("Waiting for the cluster to be allocated");
        long startTime = System.currentTimeMillis();
        long lastLogTime = System.currentTimeMillis();
        YarnApplicationState lastAppState = YarnApplicationState.NEW;
        block37: while (true) {
            try {
                report = yarnClient.getApplicationReport(appId);
            }
            catch (IOException e3) {
                throw new YarnDeploymentException("Failed to deploy the cluster.", e3);
            }
            YarnApplicationState appState = report.getYarnApplicationState();
            LOG.debug("Application State: {}", (Object)appState);
            switch (appState) {
                case FAILED: 
                case KILLED: {
                    throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \nDiagnostics from YARN: " + report.getDiagnostics() + "\nIf log aggregation is enabled on your cluster, use this command to further investigate the issue:\nyarn logs -applicationId " + appId);
                }
                case RUNNING: {
                    LOG.info("YARN application has been deployed successfully.");
                    break block37;
                }
                case FINISHED: {
                    LOG.info("YARN application has been finished successfully.");
                    break block37;
                }
                default: {
                    if (appState != lastAppState) {
                        LOG.info("Deploying cluster, current state " + appState);
                    }
                    if (System.currentTimeMillis() - lastLogTime > 60000L) {
                        lastLogTime = System.currentTimeMillis();
                        LOG.info("Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster", (Object)((lastLogTime - startTime) / 1000L));
                    }
                    lastAppState = appState;
                    Thread.sleep(250L);
                    continue block37;
                }
            }
            break;
        }
        ShutdownHookUtil.removeShutdownHook((Thread)deploymentFailureHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        return report;
    }

    private void removeLocalhostBindHostSetting(Configuration configuration, ConfigOption<?> option) {
        configuration.getOptional(option).filter(bindHost -> bindHost.equals("localhost")).ifPresent(bindHost -> {
            LOG.info("Removing 'localhost' {} setting from effective configuration; using '0.0.0.0' instead.", (Object)option);
            configuration.removeConfig(option);
        });
    }

    @VisibleForTesting
    void setTokensFor(ContainerLaunchContext containerLaunchContext, boolean fetchToken) throws Exception {
        Credentials credentials = new Credentials();
        LOG.info("Loading delegation tokens available locally to add to the AM container");
        UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
        Collection usrTok = currUsr.getCredentials().getAllTokens();
        for (Token token : usrTok) {
            LOG.info("Adding user token " + token.getService() + " with " + token);
            credentials.addToken(token.getService(), token);
        }
        if (fetchToken) {
            LOG.info("Fetching delegation tokens to add to the AM container.");
            DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(this.flinkConfiguration, null, null, null);
            DelegationTokenContainer container = new DelegationTokenContainer();
            delegationTokenManager.obtainDelegationTokens(container);
            for (Map.Entry e : container.getTokens().entrySet()) {
                if (!((List)this.flinkConfiguration.get(YarnConfigOptions.APP_MASTER_TOKEN_SERVICES)).contains(e.getKey())) continue;
                credentials.addAll(HadoopDelegationTokenConverter.deserialize((byte[])((byte[])e.getValue())));
            }
        }
        ByteBuffer tokens = ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize((Credentials)credentials));
        containerLaunchContext.setTokens(tokens);
        LOG.info("Delegation tokens added to the AM container.");
    }

    @VisibleForTesting
    Path getStagingDir(FileSystem defaultFileSystem) throws IOException {
        String configuredStagingDir = (String)this.flinkConfiguration.get(YarnConfigOptions.STAGING_DIRECTORY);
        if (configuredStagingDir == null) {
            return defaultFileSystem.getHomeDirectory();
        }
        FileSystem stagingDirFs = new Path(configuredStagingDir).getFileSystem(defaultFileSystem.getConf());
        return stagingDirFs.makeQualified(new Path(configuredStagingDir));
    }

    private int getFileReplication() {
        int yarnFileReplication = this.yarnConfiguration.getInt("dfs.replication", 3);
        int fileReplication = (Integer)this.flinkConfiguration.get(YarnConfigOptions.FILE_REPLICATION);
        return fileReplication > 0 ? fileReplication : yarnFileReplication;
    }

    private static String encodeYarnLocalResourceDescriptorListToString(List<YarnLocalResourceDescriptor> resources) {
        return String.join((CharSequence)";", resources.stream().map(YarnLocalResourceDescriptor::toString).collect(Collectors.toList()));
    }

    private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
        LOG.info("Killing YARN application");
        try {
            yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
        }
        catch (Exception e) {
            LOG.debug("Error while killing YARN application", (Throwable)e);
        }
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        List nodes = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        int totalFreeMemory = 0;
        long containerLimit = 0L;
        long[] nodeManagersFree = new long[nodes.size()];
        for (int i = 0; i < nodes.size(); ++i) {
            long free;
            NodeReport rep = (NodeReport)nodes.get(i);
            nodeManagersFree[i] = free = rep.getCapability().getMemorySize() - (rep.getUsed() != null ? rep.getUsed().getMemorySize() : 0L);
            totalFreeMemory = (int)((long)totalFreeMemory + free);
            if (free <= containerLimit) continue;
            containerLimit = free;
        }
        return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
    }

    public String getClusterDescription() {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            YarnClusterMetrics metrics = this.yarnClient.getYarnClusterMetrics();
            ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
            List nodes = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
            String format = "|%-16s |%-16s %n";
            ps.printf("|Property         |Value          %n", new Object[0]);
            ps.println("+---------------------------------------+");
            long totalMemory = 0L;
            int totalCores = 0;
            for (NodeReport rep : nodes) {
                Resource res = rep.getCapability();
                totalMemory += res.getMemorySize();
                totalCores += res.getVirtualCores();
                ps.format("|%-16s |%-16s %n", "NodeID", rep.getNodeId());
                ps.format("|%-16s |%-16s %n", "Memory", this.getDisplayMemory(res.getMemorySize()));
                ps.format("|%-16s |%-16s %n", "vCores", res.getVirtualCores());
                ps.format("|%-16s |%-16s %n", "HealthReport", rep.getHealthReport());
                ps.format("|%-16s |%-16s %n", "Containers", rep.getNumContainers());
                ps.println("+---------------------------------------+");
            }
            ps.println("Summary: totalMemory " + this.getDisplayMemory(totalMemory) + " totalCores " + totalCores);
            List qInfo = this.yarnClient.getAllQueues();
            for (QueueInfo q : qInfo) {
                ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " + q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
            }
            return baos.toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't get cluster description", e);
        }
    }

    private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
        reflector.setAttemptFailuresValidityInterval(appContext, (Long)this.flinkConfiguration.get(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
    }

    private void setApplicationTags(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
        String tagsString = (String)this.flinkConfiguration.get(YarnConfigOptions.APPLICATION_TAGS);
        HashSet<String> applicationTags = new HashSet<String>();
        for (String tag : tagsString.split(",")) {
            String trimmedTag = tag.trim();
            if (trimmedTag.isEmpty()) continue;
            applicationTags.add(trimmedTag);
        }
        reflector.setApplicationTags(appContext, applicationTags);
    }

    private void setApplicationNodeLabel(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
        if (this.nodeLabel != null) {
            ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
            reflector.setApplicationNodeLabel(appContext, this.nodeLabel);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @VisibleForTesting
    void addLibFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
        String libDir = System.getenv().get("FLINK_LIB_DIR");
        if (libDir != null) {
            File directoryFile = new File(libDir);
            if (!directoryFile.isDirectory()) throw new YarnDeploymentException("The environment variable 'FLINK_LIB_DIR' is set to '" + libDir + "' but the directory doesn't exist.");
            effectiveShipFiles.add(Utils.getPathFromLocalFile(directoryFile));
            return;
        } else {
            if (!this.shipFiles.isEmpty()) return;
            LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. Not shipping any library files.", (Object)"FLINK_LIB_DIR");
        }
    }

    @VisibleForTesting
    void addUsrLibFolderToShipFiles(Collection<File> effectiveShipFiles) {
        ClusterEntrypointUtils.tryFindUserLibDirectory().ifPresent(usrLibDirFile -> {
            effectiveShipFiles.add((File)usrLibDirFile);
            LOG.info("usrlib: {} will be shipped automatically.", (Object)usrLibDirFile.getAbsolutePath());
        });
    }

    @VisibleForTesting
    void addPluginsFoldersToShipFiles(Collection<Path> effectiveShipFiles) {
        Optional pluginsDir = PluginConfig.getPluginsDir();
        pluginsDir.ifPresent(dir -> effectiveShipFiles.add(Utils.getPathFromLocalFile(dir)));
    }

    ContainerLaunchContext setupApplicationMasterContainer(String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
        List<ConfigOption<String>> jvmOptions = Arrays.asList(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, CoreOptions.FLINK_JVM_OPTIONS, CoreOptions.FLINK_DEFAULT_JM_JVM_OPTIONS, CoreOptions.FLINK_JM_JVM_OPTIONS);
        String javaOpts = Utils.generateJvmOptsString(this.flinkConfiguration, jvmOptions, hasKrb5);
        ContainerLaunchContext amContainer = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        HashMap<String, String> startCommandValues = new HashMap<String, String>();
        startCommandValues.put("java", "$JAVA_HOME/bin/java");
        String jvmHeapMem = JobManagerProcessUtils.generateJvmParametersStr((JobManagerProcessSpec)processSpec, (Configuration)this.flinkConfiguration);
        startCommandValues.put("jvmmem", jvmHeapMem);
        startCommandValues.put("jvmopts", javaOpts);
        startCommandValues.put("logging", YarnLogConfigUtil.getLoggingYarnCommand(this.flinkConfiguration));
        startCommandValues.put("class", yarnClusterEntrypoint);
        startCommandValues.put("redirects", "1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err");
        String dynamicParameterListStr = JobManagerProcessUtils.generateDynamicConfigsStr((JobManagerProcessSpec)processSpec);
        startCommandValues.put("args", dynamicParameterListStr);
        String commandTemplate = (String)this.flinkConfiguration.get(YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE);
        String amCommand = Utils.getStartCommand(commandTemplate, startCommandValues);
        amContainer.setCommands(Collections.singletonList(amCommand));
        LOG.debug("Application Master start command: " + amCommand);
        return amContainer;
    }

    private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(Configuration config) {
        return (YarnConfigOptions.UserJarInclusion)((Object)config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR));
    }

    private static boolean isUsrLibDirIncludedInShipFiles(List<Path> shipFiles, YarnConfiguration yarnConfig) {
        return shipFiles.stream().map(FunctionUtils.uncheckedFunction(path -> YarnClusterDescriptor.getFileStatus(path, yarnConfig))).filter(FileStatus::isDirectory).map(status -> status.getPath().getName().toLowerCase()).anyMatch(name -> name.equals("usrlib"));
    }

    private void setClusterEntrypointInfoToConfig(ApplicationReport report) {
        Preconditions.checkNotNull((Object)report);
        ApplicationId appId = report.getApplicationId();
        String host = report.getHost();
        int port = report.getRpcPort();
        LOG.info("Found Web Interface {}:{} of application '{}'.", new Object[]{host, port, appId});
        this.flinkConfiguration.set(JobManagerOptions.ADDRESS, (Object)host);
        this.flinkConfiguration.set(JobManagerOptions.PORT, (Object)port);
        this.flinkConfiguration.set(RestOptions.ADDRESS, (Object)host);
        this.flinkConfiguration.set(RestOptions.PORT, (Object)port);
        this.flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, (Object)ConverterUtils.toString((ApplicationId)appId));
        this.setHAClusterIdIfNotSet(this.flinkConfiguration, appId);
    }

    private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
        if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
            configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)ConverterUtils.toString((ApplicationId)appId));
        }
    }

    public static void logDetachedClusterInformation(ApplicationId yarnApplicationId, Logger logger) {
        logger.info("The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:\n$ echo \"stop\" | ./bin/yarn-session.sh -id {}\nIf this should not be possible, then you can also kill Flink via YARN's web interface or via:\n$ yarn application -kill {}\nNote that killing Flink might not clean up all job artifacts and temporary files.", (Object)yarnApplicationId, (Object)yarnApplicationId);
    }

    @VisibleForTesting
    Map<String, String> generateApplicationMasterEnv(YarnApplicationFileUploader fileUploader, String classPathStr, String localFlinkJarStr, String appIdStr) throws IOException {
        HashMap<String, String> env = new HashMap<String, String>();
        env.putAll(ConfigurationUtils.getPrefixedKeyValuePairs((String)"containerized.master.env.", (Configuration)this.flinkConfiguration));
        env.put("_FLINK_CLASSPATH", classPathStr);
        env.put("FLINK_LIB_DIR", "./lib");
        env.put("FLINK_OPT_DIR", "./opt");
        env.put("_FLINK_DIST_JAR", localFlinkJarStr);
        env.put("_APP_ID", appIdStr);
        env.put("_CLIENT_HOME_DIR", fileUploader.getHomeDir().toString());
        env.put("_CLIENT_SHIP_FILES", YarnClusterDescriptor.encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));
        env.put("_FLINK_YARN_FILES", fileUploader.getApplicationDir().toUri().toString());
        env.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)this.yarnConfiguration, env);
        return env;
    }

    private String getDisplayMemory(long memoryMB) {
        return MemorySize.ofMebiBytes((long)memoryMB).toHumanReadableString();
    }

    private class DeploymentFailureHook
    extends Thread {
        private final YarnClient yarnClient;
        private final YarnClientApplication yarnApplication;
        private final Path yarnFilesDir;

        DeploymentFailureHook(YarnClientApplication yarnApplication, Path yarnFilesDir) {
            this.yarnApplication = (YarnClientApplication)Preconditions.checkNotNull((Object)yarnApplication);
            this.yarnFilesDir = (Path)Preconditions.checkNotNull((Object)yarnFilesDir);
            this.yarnClient = YarnClient.createYarnClient();
            this.yarnClient.init((org.apache.hadoop.conf.Configuration)YarnClusterDescriptor.this.yarnConfiguration);
        }

        @Override
        public void run() {
            LOG.info("Cancelling deployment from Deployment Failure Hook");
            this.yarnClient.start();
            YarnClusterDescriptor.this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            this.yarnClient.stop();
            LOG.info("Deleting files in {}.", (Object)this.yarnFilesDir);
            try {
                FileSystem fs = FileSystem.get((org.apache.hadoop.conf.Configuration)YarnClusterDescriptor.this.yarnConfiguration);
                if (!fs.delete(this.yarnFilesDir, true)) {
                    throw new IOException("Deleting files in " + this.yarnFilesDir + " was unsuccessful");
                }
                fs.close();
            }
            catch (IOException e) {
                LOG.error("Failed to delete Flink Jar and configuration files in HDFS", (Throwable)e);
            }
        }
    }

    private static class YarnDeploymentException
    extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException(String message) {
            super(message);
        }

        public YarnDeploymentException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class ApplicationSubmissionContextReflector {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
        private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
        private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
        private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
        private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
        private static final String NODE_LABEL_EXPRESSION_NAME = "setNodeLabelExpression";
        private final Method applicationTagsMethod;
        private final Method attemptFailuresValidityIntervalMethod;
        private final Method keepContainersMethod;
        @Nullable
        private final Method nodeLabelExpressionMethod;

        public static ApplicationSubmissionContextReflector getInstance() {
            return instance;
        }

        private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
            Method nodeLabelExpressionMethod;
            Method keepContainersMethod;
            Method attemptFailuresValidityIntervalMethod;
            Method applicationTagsMethod;
            try {
                applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
                applicationTagsMethod = null;
            }
            this.applicationTagsMethod = applicationTagsMethod;
            try {
                attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, Long.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
                attemptFailuresValidityIntervalMethod = null;
            }
            this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
            try {
                keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, Boolean.TYPE);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
                keepContainersMethod = null;
            }
            this.keepContainersMethod = keepContainersMethod;
            try {
                nodeLabelExpressionMethod = clazz.getMethod(NODE_LABEL_EXPRESSION_NAME, String.class);
                LOG.debug("{} supports method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", (Object)clazz.getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
                nodeLabelExpressionMethod = null;
            }
            this.nodeLabelExpressionMethod = nodeLabelExpressionMethod;
        }

        public void setApplicationTags(ApplicationSubmissionContext appContext, Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
            if (this.applicationTagsMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.applicationTagsMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.applicationTagsMethod.invoke((Object)appContext, applicationTags);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)APPLICATION_TAGS_METHOD_NAME);
            }
        }

        public void setApplicationNodeLabel(ApplicationSubmissionContext appContext, String nodeLabel) throws InvocationTargetException, IllegalAccessException {
            if (this.nodeLabelExpressionMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.nodeLabelExpressionMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.nodeLabelExpressionMethod.invoke((Object)appContext, nodeLabel);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)NODE_LABEL_EXPRESSION_NAME);
            }
        }

        public void setAttemptFailuresValidityInterval(ApplicationSubmissionContext appContext, long validityInterval) throws InvocationTargetException, IllegalAccessException {
            if (this.attemptFailuresValidityIntervalMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.attemptFailuresValidityIntervalMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.attemptFailuresValidityIntervalMethod.invoke((Object)appContext, validityInterval);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)ATTEMPT_FAILURES_METHOD_NAME);
            }
        }

        public void setKeepContainersAcrossApplicationAttempts(ApplicationSubmissionContext appContext, boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
            if (this.keepContainersMethod != null) {
                LOG.debug("Calling method {} of {}.", (Object)this.keepContainersMethod.getName(), (Object)appContext.getClass().getCanonicalName());
                this.keepContainersMethod.invoke((Object)appContext, keepContainers);
            } else {
                LOG.debug("{} does not support method {}. Doing nothing.", (Object)appContext.getClass().getCanonicalName(), (Object)KEEP_CONTAINERS_METHOD_NAME);
            }
        }
    }

    private static class ClusterResourceDescription {
        public final long totalFreeMemory;
        public final long containerLimit;
        public final long[] nodeManagersFree;

        public ClusterResourceDescription(long totalFreeMemory, long containerLimit, long[] nodeManagersFree) {
            this.totalFreeMemory = totalFreeMemory;
            this.containerLimit = containerLimit;
            this.nodeManagersFree = nodeManagersFree;
        }
    }
}

