/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.yarn.YarnLocalResourceDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Utils {
    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
    public static final String KRB5_FILE_NAME = "krb5.conf";
    public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
    private static final String WILDCARD_ACL = "*";
    private static final String[] FLINK_CONFIG_PREFIXES = new String[]{"flink.yarn."};
    @VisibleForTesting
    static final String YARN_RM_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";
    @VisibleForTesting
    static final String YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_KEY = "yarn.resource-types.memory-mb.increment-allocation";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY = "yarn.scheduler.increment-allocation-mb";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB = 1024;
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY = "yarn.resource-types.vcores.increment-allocation";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores";
    @VisibleForTesting
    static final String IGNORE_UNRECOGNIZED_VM_OPTIONS = "-XX:+IgnoreUnrecognizedVMOptions";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;

    public static void setupYarnClassPath(org.apache.hadoop.conf.Configuration conf, Map<String, String> appMasterEnv) {
        String[] applicationClassPathEntries;
        Utils.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), appMasterEnv.get("_FLINK_CLASSPATH"));
        for (String c : applicationClassPathEntries = conf.getStrings("yarn.application.classpath", YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
            Utils.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), c.trim());
        }
    }

    public static void deleteApplicationFiles(String applicationFilesDir) {
        if (!StringUtils.isNullOrWhitespaceOnly((String)applicationFilesDir)) {
            Path path = new Path(applicationFilesDir);
            try {
                FileSystem fileSystem = path.getFileSystem();
                if (!fileSystem.delete(path, true)) {
                    LOG.error("Deleting yarn application files under {} was unsuccessful.", (Object)applicationFilesDir);
                }
            }
            catch (IOException e) {
                LOG.error("Could not properly delete yarn application files directory {}.", (Object)applicationFilesDir, (Object)e);
            }
        } else {
            LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
        }
    }

    static LocalResource registerLocalResource(org.apache.hadoop.fs.Path remoteRsrcPath, long resourceSize, long resourceModificationTime, LocalResourceVisibility resourceVisibility, LocalResourceType resourceType) {
        LocalResource localResource = (LocalResource)Records.newRecord(LocalResource.class);
        localResource.setResource(URL.fromURI((URI)remoteRsrcPath.toUri()));
        localResource.setSize(resourceSize);
        localResource.setTimestamp(resourceModificationTime);
        localResource.setType(resourceType);
        localResource.setVisibility(resourceVisibility);
        return localResource;
    }

    private static LocalResource registerLocalResource(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path remoteRsrcPath, LocalResourceType resourceType) throws IOException {
        FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
        return Utils.registerLocalResource(remoteRsrcPath, jarStat.getLen(), jarStat.getModificationTime(), LocalResourceVisibility.APPLICATION, resourceType);
    }

    public static void addToEnvironment(Map<String, String> environment, String variable, String value) {
        String val = environment.get(variable);
        val = val == null ? value : val + File.pathSeparator + value;
        environment.put(StringInterner.weakIntern((String)variable), StringInterner.weakIntern((String)val));
    }

    public static String resolveKeytabPath(String workingDir, String keytabPath) {
        String keytab = null;
        if (keytabPath != null) {
            File f = new File(keytabPath);
            if (f.exists()) {
                keytab = f.getAbsolutePath();
                LOG.info("Resolved keytab path: {}", (Object)keytab);
            } else {
                f = new File(workingDir, keytabPath);
                if (f.exists()) {
                    keytab = f.getAbsolutePath();
                    LOG.info("Resolved keytab path: {}", (Object)keytab);
                } else {
                    LOG.warn("Could not resolve keytab path with: {}", (Object)keytabPath);
                    keytab = null;
                }
            }
        }
        return keytab;
    }

    private Utils() {
        throw new RuntimeException();
    }

    static ContainerLaunchContext createTaskExecutorContext(Configuration flinkConfig, YarnConfiguration yarnConfig, YarnResourceManagerDriverConfiguration configuration, ContaineredTaskManagerParameters tmParams, String taskManagerDynamicProperties, String workingDirectory, Class<?> taskManagerMainClass, Logger log) throws Exception {
        String remoteFlinkJarPath = (String)Preconditions.checkNotNull((Object)configuration.getFlinkDistJar(), (String)"Environment variable %s not set", (Object[])new Object[]{"_FLINK_DIST_JAR"});
        String shipListString = (String)Preconditions.checkNotNull((Object)configuration.getClientShipFiles(), (String)"Environment variable %s not set", (Object[])new Object[]{"_CLIENT_SHIP_FILES"});
        String remoteKeytabPath = configuration.getRemoteKeytabPath();
        String localKeytabPath = configuration.getLocalKeytabPath();
        String keytabPrincipal = configuration.getKeytabPrinciple();
        String remoteYarnConfPath = configuration.getYarnSiteXMLPath();
        String remoteKrb5Path = configuration.getKrb5Path();
        if (log.isDebugEnabled()) {
            log.debug("TM:remote keytab path obtained {}", (Object)remoteKeytabPath);
            log.debug("TM:local keytab path obtained {}", (Object)localKeytabPath);
            log.debug("TM:keytab principal obtained {}", (Object)keytabPrincipal);
            log.debug("TM:remote yarn conf path obtained {}", (Object)remoteYarnConfPath);
            log.debug("TM:remote krb5 path obtained {}", (Object)remoteKrb5Path);
        }
        String classPathString = (String)Preconditions.checkNotNull((Object)configuration.getFlinkClasspath(), (String)"Environment variable %s not set", (Object[])new Object[]{"_FLINK_CLASSPATH"});
        LocalResource keytabResource = null;
        if (remoteKeytabPath != null) {
            log.info("TM:Adding keytab {} to the container local resource bucket", (Object)remoteKeytabPath);
            org.apache.hadoop.fs.Path keytabPath = new org.apache.hadoop.fs.Path(remoteKeytabPath);
            org.apache.hadoop.fs.FileSystem fs = keytabPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            keytabResource = Utils.registerLocalResource(fs, keytabPath, LocalResourceType.FILE);
        }
        LocalResource yarnConfResource = null;
        if (remoteYarnConfPath != null) {
            log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", (Object)remoteYarnConfPath);
            org.apache.hadoop.fs.Path yarnConfPath = new org.apache.hadoop.fs.Path(remoteYarnConfPath);
            org.apache.hadoop.fs.FileSystem fs = yarnConfPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            yarnConfResource = Utils.registerLocalResource(fs, yarnConfPath, LocalResourceType.FILE);
        }
        LocalResource krb5ConfResource = null;
        boolean hasKrb5 = false;
        if (remoteKrb5Path != null) {
            log.info("Adding remoteKrb5Path {} to the container local resource bucket", (Object)remoteKrb5Path);
            org.apache.hadoop.fs.Path krb5ConfPath = new org.apache.hadoop.fs.Path(remoteKrb5Path);
            org.apache.hadoop.fs.FileSystem fs = krb5ConfPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            krb5ConfResource = Utils.registerLocalResource(fs, krb5ConfPath, LocalResourceType.FILE);
            hasKrb5 = true;
        }
        HashMap<String, LocalResource> taskManagerLocalResources = new HashMap<String, LocalResource>();
        YarnLocalResourceDescriptor flinkDistLocalResourceDesc = YarnLocalResourceDescriptor.fromString(remoteFlinkJarPath);
        taskManagerLocalResources.put(flinkDistLocalResourceDesc.getResourceKey(), flinkDistLocalResourceDesc.toLocalResource());
        if (yarnConfResource != null) {
            taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
        }
        if (krb5ConfResource != null) {
            taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
        }
        if (keytabResource != null) {
            taskManagerLocalResources.put(localKeytabPath, keytabResource);
        }
        Utils.decodeYarnLocalResourceDescriptorListFromString(shipListString).forEach(resourceDesc -> taskManagerLocalResources.put(resourceDesc.getResourceKey(), resourceDesc.toLocalResource()));
        log.info("Creating container launch context for TaskManagers");
        boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
        boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
        String launchCommand = Utils.getTaskManagerShellCommand(flinkConfig, tmParams, ".", "<LOG_DIR>", hasLogback, hasLog4j, hasKrb5, taskManagerMainClass, taskManagerDynamicProperties);
        if (log.isDebugEnabled()) {
            log.debug("Starting TaskManagers with command: " + launchCommand);
        } else {
            log.info("Starting TaskManagers");
        }
        ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        ctx.setCommands(Collections.singletonList(launchCommand));
        ctx.setLocalResources(taskManagerLocalResources);
        HashMap<String, String> containerEnv = new HashMap<String, String>();
        containerEnv.putAll(tmParams.taskManagerEnv());
        containerEnv.put("_FLINK_CLASSPATH", classPathString);
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)yarnConfig, containerEnv);
        containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
        if (remoteKeytabPath != null && localKeytabPath != null && keytabPrincipal != null) {
            containerEnv.put("_REMOTE_KEYTAB_PATH", remoteKeytabPath);
            containerEnv.put("_LOCAL_KEYTAB_PATH", localKeytabPath);
            containerEnv.put("_KEYTAB_PRINCIPAL", keytabPrincipal);
        } else if (localKeytabPath != null && keytabPrincipal != null) {
            containerEnv.put("_LOCAL_KEYTAB_PATH", localKeytabPath);
            containerEnv.put("_KEYTAB_PRINCIPAL", keytabPrincipal);
        }
        ctx.setEnvironment(containerEnv);
        Utils.setAclsFor(ctx, flinkConfig);
        String fileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
        if (fileLocation != null) {
            log.debug("Adding security tokens to TaskExecutor's container launch context.");
            try (DataOutputBuffer dob = new DataOutputBuffer();){
                Credentials cred = Credentials.readTokenStorageFile((File)new File(fileLocation), (org.apache.hadoop.conf.Configuration)HadoopUtils.getHadoopConfiguration((Configuration)flinkConfig));
                Credentials taskManagerCred = new Credentials();
                Collection userTokens = cred.getAllTokens();
                for (Token token : userTokens) {
                    if (token.getKind().equals((Object)AMRMTokenIdentifier.KIND_NAME)) continue;
                    taskManagerCred.addToken(token.getService(), token);
                }
                taskManagerCred.writeTokenStorageToStream((DataOutputStream)dob);
                ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
                ctx.setTokens(securityTokens);
            }
            catch (Throwable t) {
                log.error("Failed to add Hadoop's security tokens.", t);
            }
        } else {
            log.info("Could not set security tokens because Hadoop's token file location is unknown.");
        }
        return ctx;
    }

    public static String getTaskManagerShellCommand(Configuration flinkConfig, ContaineredTaskManagerParameters tmParams, String configDirectory, String logDirectory, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, Class<?> mainClass, String mainArgs) {
        HashMap<String, String> startCommandValues = new HashMap<String, String>();
        startCommandValues.put("java", "$JAVA_HOME/bin/java");
        TaskExecutorProcessSpec taskExecutorProcessSpec = tmParams.getTaskExecutorProcessSpec();
        startCommandValues.put("jvmmem", ProcessMemoryUtils.generateJvmParametersStr((ProcessMemorySpec)taskExecutorProcessSpec));
        List<ConfigOption<String>> jvmOptions = Arrays.asList(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, CoreOptions.FLINK_JVM_OPTIONS, CoreOptions.FLINK_DEFAULT_TM_JVM_OPTIONS, CoreOptions.FLINK_TM_JVM_OPTIONS);
        startCommandValues.put("jvmopts", Utils.generateJvmOptsString(flinkConfig, jvmOptions, hasKrb5));
        String logging = "";
        if (hasLogback || hasLog4j) {
            logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
            if (hasLogback) {
                logging = logging + " -Dlogback.configurationFile=file:" + configDirectory + "/logback.xml";
            }
            if (hasLog4j) {
                logging = logging + " -Dlog4j.configuration=file:" + configDirectory + "/log4j.properties";
                logging = logging + " -Dlog4j.configurationFile=file:" + configDirectory + "/log4j.properties";
            }
        }
        startCommandValues.put("logging", logging);
        startCommandValues.put("class", mainClass.getName());
        startCommandValues.put("redirects", "1> " + logDirectory + "/taskmanager.out 2> " + logDirectory + "/taskmanager.err");
        String argsStr = TaskExecutorProcessUtils.generateDynamicConfigsStr((TaskExecutorProcessSpec)taskExecutorProcessSpec) + " --configDir " + configDirectory;
        if (!mainArgs.isEmpty()) {
            argsStr = argsStr + " " + mainArgs;
        }
        startCommandValues.put("args", argsStr);
        String commandTemplate = (String)flinkConfig.get(YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE);
        String startCommand = Utils.getStartCommand(commandTemplate, startCommandValues);
        LOG.debug("TaskManager start command: " + startCommand);
        return startCommand;
    }

    public static String getStartCommand(String template, Map<String, String> startCommandValues) {
        for (Map.Entry<String, String> variable : startCommandValues.entrySet()) {
            template = template.replace("%" + variable.getKey() + "%", variable.getValue()).replace("  ", " ").trim();
        }
        return template;
    }

    public static String generateJvmOptsString(Configuration conf, List<ConfigOption<String>> jvmOptions, boolean hasKrb5) {
        StringBuilder javaOptsSb = new StringBuilder();
        for (ConfigOption<String> option : jvmOptions) {
            Utils.concatWithSpace(javaOptsSb, (String)conf.get(option));
        }
        Utils.concatWithSpace(javaOptsSb, IGNORE_UNRECOGNIZED_VM_OPTIONS);
        if (hasKrb5) {
            Utils.concatWithSpace(javaOptsSb, "-Djava.security.krb5.conf=krb5.conf");
        }
        return javaOptsSb.toString().trim();
    }

    static boolean isRemotePath(String path) throws IOException {
        Path flinkPath = new Path(path);
        return flinkPath.getFileSystem().isDistributedFS();
    }

    private static List<YarnLocalResourceDescriptor> decodeYarnLocalResourceDescriptorListFromString(String resources) throws Exception {
        ArrayList<YarnLocalResourceDescriptor> resourceDescriptors = new ArrayList<YarnLocalResourceDescriptor>();
        for (String shipResourceDescStr : resources.split(";")) {
            if (shipResourceDescStr.isEmpty()) continue;
            resourceDescriptors.add(YarnLocalResourceDescriptor.fromString(shipResourceDescStr));
        }
        return resourceDescriptors;
    }

    @VisibleForTesting
    static Resource getUnitResource(YarnConfiguration yarnConfig) {
        int unitVcore;
        int unitMemMB;
        String yarnRmSchedulerClazzName = yarnConfig.get("yarn.resourcemanager.scheduler.class");
        if (Objects.equals(yarnRmSchedulerClazzName, YARN_RM_FAIR_SCHEDULER_CLAZZ) || Objects.equals(yarnRmSchedulerClazzName, YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ)) {
            String propMem = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_MB_KEY);
            String propVcore = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY);
            unitMemMB = propMem != null ? Integer.parseInt(propMem) : yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, 1024);
            unitVcore = propVcore != null ? Integer.parseInt(propVcore) : yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, 1);
        } else {
            unitMemMB = yarnConfig.getInt("yarn.scheduler.minimum-allocation-mb", 1024);
            unitVcore = yarnConfig.getInt("yarn.scheduler.minimum-allocation-vcores", 1);
        }
        return Resource.newInstance((int)unitMemMB, (int)unitVcore);
    }

    public static List<org.apache.hadoop.fs.Path> getQualifiedRemoteProvidedLibDirs(Configuration configuration, YarnConfiguration yarnConfiguration) throws IOException {
        return Utils.getRemoteSharedLibPaths(configuration, (FunctionWithException<String, org.apache.hadoop.fs.Path, IOException>)((FunctionWithException)pathStr -> {
            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(pathStr);
            return path.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfiguration).makeQualified(path);
        }));
    }

    private static List<org.apache.hadoop.fs.Path> getRemoteSharedLibPaths(Configuration configuration, FunctionWithException<String, org.apache.hadoop.fs.Path, IOException> strToPathMapper) throws IOException {
        List providedLibDirs = ConfigUtils.decodeListFromConfig((ReadableConfig)configuration, YarnConfigOptions.PROVIDED_LIB_DIRS, strToPathMapper);
        for (org.apache.hadoop.fs.Path path : providedLibDirs) {
            if (Utils.isRemotePath(path.toString())) continue;
            throw new IllegalArgumentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain dirs accessible from all worker nodes, while the \"" + path + "\" is local.");
        }
        return providedLibDirs;
    }

    public static boolean isUsrLibDirectory(org.apache.hadoop.fs.FileSystem fileSystem, org.apache.hadoop.fs.Path path) throws IOException {
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        return fileStatus.isDirectory() && "usrlib".equals(fileStatus.getPath().getName());
    }

    public static Optional<org.apache.hadoop.fs.Path> getQualifiedRemoteProvidedUsrLib(Configuration configuration, YarnConfiguration yarnConfiguration) throws IOException, IllegalArgumentException {
        String usrlib = (String)configuration.get(YarnConfigOptions.PROVIDED_USRLIB_DIR);
        if (usrlib == null) {
            return Optional.empty();
        }
        org.apache.hadoop.fs.Path qualifiedUsrLibPath = org.apache.hadoop.fs.FileSystem.get((org.apache.hadoop.conf.Configuration)yarnConfiguration).makeQualified(new org.apache.hadoop.fs.Path(usrlib));
        Preconditions.checkArgument((boolean)Utils.isRemotePath(qualifiedUsrLibPath.toString()), (String)"The \"%s\" must point to a remote dir which is accessible from all worker nodes.", (Object[])new Object[]{YarnConfigOptions.PROVIDED_USRLIB_DIR.key()});
        Preconditions.checkArgument((boolean)Utils.isUsrLibDirectory(org.apache.hadoop.fs.FileSystem.get((org.apache.hadoop.conf.Configuration)yarnConfiguration), qualifiedUsrLibPath), (String)"The \"%s\" should be named with \"%s\".", (Object[])new Object[]{YarnConfigOptions.PROVIDED_USRLIB_DIR.key(), "usrlib"});
        return Optional.of(qualifiedUsrLibPath);
    }

    public static YarnConfiguration getYarnAndHadoopConfiguration(Configuration flinkConfig) {
        YarnConfiguration yarnConfig = Utils.getYarnConfiguration(flinkConfig);
        yarnConfig.addResource(HadoopUtils.getHadoopConfiguration((Configuration)flinkConfig));
        return yarnConfig;
    }

    public static YarnConfiguration getYarnConfiguration(Configuration flinkConfig) {
        YarnConfiguration yarnConfig = new YarnConfiguration();
        for (String key : flinkConfig.keySet()) {
            for (String prefix : FLINK_CONFIG_PREFIXES) {
                if (!key.startsWith(prefix)) continue;
                String newKey = key.substring("flink.".length());
                String value = flinkConfig.getString(key, null);
                yarnConfig.set(newKey, value);
                LOG.debug("Adding Flink config entry for {} as {}={} to Yarn config", new Object[]{key, newKey, value});
            }
        }
        return yarnConfig;
    }

    public static void setAclsFor(ContainerLaunchContext amContainer, Configuration flinkConfig) {
        HashMap<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>();
        String viewAcls = (String)flinkConfig.get(YarnConfigOptions.APPLICATION_VIEW_ACLS);
        String modifyAcls = (String)flinkConfig.get(YarnConfigOptions.APPLICATION_MODIFY_ACLS);
        Utils.validateAclString(viewAcls);
        Utils.validateAclString(modifyAcls);
        if (viewAcls != null && !viewAcls.isEmpty()) {
            acls.put(ApplicationAccessType.VIEW_APP, viewAcls);
        }
        if (modifyAcls != null && !modifyAcls.isEmpty()) {
            acls.put(ApplicationAccessType.MODIFY_APP, modifyAcls);
        }
        if (!acls.isEmpty()) {
            amContainer.setApplicationACLs(acls);
        }
    }

    private static void validateAclString(String acl) {
        if (acl != null && acl.contains(WILDCARD_ACL) && !acl.equals(WILDCARD_ACL)) {
            throw new IllegalArgumentException(String.format("Invalid wildcard ACL %s. The ACL wildcard does not support regex. The only valid wildcard ACL is '*'.", acl));
        }
    }

    public static org.apache.hadoop.fs.Path getPathFromLocalFile(File localFile) {
        return new org.apache.hadoop.fs.Path(localFile.toURI());
    }

    public static org.apache.hadoop.fs.Path getPathFromLocalFilePathStr(String localPathStr) {
        return Utils.getPathFromLocalFile(new File(localPathStr));
    }

    public static void concatWithSpace(StringBuilder sb, String value) {
        if (value == null || value.isEmpty()) {
            return;
        }
        sb.append(' ');
        sb.append(value);
    }
}

