package _ss_com.streamsets.datacollector.cluster;

import _ss_com.com.google.common.base.Joiner;
import _ss_com.com.google.common.collect.ImmutableList;
import _ss_com.com.google.common.io.Files;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineBeanCreator;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.http.WebServerTask;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.security.SecurityConfiguration;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.SystemProcessFactory;
import _ss_com.streamsets.datacollector.validation.Issue;
import _ss_com.streamsets.datacollector.websockets.StatusWebSocket;
import _ss_com.streamsets.pipeline.lib.util.ThreadUtil;
import _ss_com.streamsets.pipeline.util.SystemProcess;
import _ss_org.apache.commons.compress.archivers.ArchiveStreamFactory;
import _ss_org.apache.commons.io.FileUtils;
import com.amazonaws.regions.ServiceAbbreviations;
import com.amazonaws.util.StringUtils;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.impl.Utils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import javax.annotation.Nullable;

/* loaded from: input_file:_ss_com/streamsets/datacollector/cluster/ShellClusterProvider.class */
public class ShellClusterProvider extends BaseClusterProvider {
    private static final String KERBEROS_AUTH = "KERBEROS_AUTH";
    private static final String KERBEROS_KEYTAB = "KERBEROS_KEYTAB";
    private static final String KERBEROS_PRINCIPAL = "KERBEROS_PRINCIPAL";
    private static final String STAGING_DIR = "STAGING_DIR";
    private static final String MESOS_UBER_JAR_PATH = "MESOS_UBER_JAR_PATH";
    private static final String MESOS_UBER_JAR = "MESOS_UBER_JAR";
    private static final String ETC_TAR_ARCHIVE = "ETC_TAR_ARCHIVE";
    private static final String LIBS_TAR_ARCHIVE = "LIBS_TAR_ARCHIVE";
    private static final String RESOURCES_TAR_ARCHIVE = "RESOURCES_TAR_ARCHIVE";
    private static final String MESOS_HOSTING_JAR_DIR = "MESOS_HOSTING_JAR_DIR";
    private static final String MAPR_UNAME_PWD_SECURITY_ENABLED_KEY = "maprlogin.password.enabled";
    private final File clusterManagerScript;
    private StageLibraryTask stageLibraryTask;
    private final YARNStatusParser yarnStatusParser;
    private final MesosStatusParser mesosStatusParser;

    public ShellClusterProvider(RuntimeInfo runtimeInfo, @Nullable SecurityConfiguration securityConfiguration, Configuration configuration, StageLibraryTask stageLibraryTask) {
        super(runtimeInfo, securityConfiguration, configuration, stageLibraryTask);
        this.clusterManagerScript = new File(runtimeInfo.getLibexecDir(), "_cluster-manager");
        Utils.checkState(this.clusterManagerScript.isFile(), errorString("_cluster-manager does not exist: {}", this.clusterManagerScript));
        Utils.checkState(this.clusterManagerScript.canExecute(), errorString("_cluster-manager is not executable: {}", this.clusterManagerScript));
        this.yarnStatusParser = new YARNStatusParser();
        this.mesosStatusParser = new MesosStatusParser();
    }

    protected SystemProcessFactory getSystemProcessFactory() {
        return new SystemProcessFactory();
    }

    protected File getClusterManagerScript() {
        return this.clusterManagerScript;
    }

    private void addKerberosConfiguration(Map<String, String> map) {
        if (getSecurityConfiguration() != null) {
            map.put(KERBEROS_AUTH, String.valueOf(getSecurityConfiguration().isKerberosEnabled()));
            if (getSecurityConfiguration().isKerberosEnabled()) {
                map.put(KERBEROS_PRINCIPAL, getSecurityConfiguration().getKerberosPrincipal());
                map.put(KERBEROS_KEYTAB, getSecurityConfiguration().getKerberosKeytab());
            }
        }
    }

    private void addProxyUserConfiguration(Map<String, String> map, String str) {
        if (str != null) {
            getLog().info("Will submit MR job as user {}", str);
            map.put(ClusterModeConstants.HADOOP_PROXY_USER, str);
        }
    }

    private void addMesosArgs(PipelineConfiguration pipelineConfiguration, Map<String, String> map, ImmutableList.Builder<String> builder) {
        String str = (String) Utils.checkNotNull(PipelineBeanCreator.get().getMesosDispatcherURL(pipelineConfiguration), "mesosDispatcherURL");
        map.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_MESOS);
        builder.add((ImmutableList.Builder<String>) "--master");
        builder.add((ImmutableList.Builder<String>) str);
    }

    @Override // _ss_com.streamsets.datacollector.cluster.ClusterProvider
    public void killPipeline(File file, ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) throws TimeoutException, IOException {
        String appId = applicationState.getAppId();
        HashMap hashMap = new HashMap();
        hashMap.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_YARN);
        addKerberosConfiguration(hashMap);
        ImmutableList.Builder<String> builder = ImmutableList.builder();
        builder.add((ImmutableList.Builder<String>) this.clusterManagerScript.getAbsolutePath());
        builder.add((ImmutableList.Builder<String>) "kill");
        builder.add((ImmutableList.Builder<String>) appId);
        if (PipelineBeanCreator.get().getExecutionMode(pipelineConfiguration, new ArrayList()) == ExecutionMode.CLUSTER_MESOS_STREAMING) {
            addMesosArgs(pipelineConfiguration, hashMap, builder);
        }
        SystemProcess create = getSystemProcessFactory().create(BaseClusterProvider.class.getSimpleName(), file, builder.build());
        try {
            create.start(hashMap);
            if (create.waitFor(30L, TimeUnit.SECONDS)) {
            } else {
                throw new TimeoutException(errorString("Kill command for {} timed out.", appId));
            }
        } finally {
            create.cleanup();
        }
    }

    @Override // _ss_com.streamsets.datacollector.cluster.ClusterProvider
    public ClusterPipelineStatus getStatus(File file, ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) throws TimeoutException, IOException {
        applicationState.getAppId();
        HashMap hashMap = new HashMap();
        hashMap.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_YARN);
        addKerberosConfiguration(hashMap);
        ImmutableList.Builder<String> builder = ImmutableList.builder();
        builder.add((ImmutableList.Builder<String>) this.clusterManagerScript.getAbsolutePath());
        builder.add((ImmutableList.Builder<String>) StatusWebSocket.TYPE);
        builder.add((ImmutableList.Builder<String>) applicationState.getAppId());
        ExecutionMode executionMode = PipelineBeanCreator.get().getExecutionMode(pipelineConfiguration, new ArrayList());
        if (executionMode == ExecutionMode.CLUSTER_MESOS_STREAMING) {
            addMesosArgs(pipelineConfiguration, hashMap, builder);
        }
        SystemProcess create = getSystemProcessFactory().create(BaseClusterProvider.class.getSimpleName(), file, builder.build());
        try {
            create.start(hashMap);
            if (!create.waitFor(30L, TimeUnit.SECONDS)) {
                throw new TimeoutException(errorString("YARN status command for {} timed out.", applicationState.getAppId()));
            }
            if (create.exitValue() != 0) {
                throw new IllegalStateException(errorString("Status command for {} failed with exit code {}.", applicationState.getAppId(), Integer.valueOf(create.exitValue())));
            }
            ClusterPipelineStatus valueOf = ClusterPipelineStatus.valueOf(executionMode == ExecutionMode.CLUSTER_MESOS_STREAMING ? this.mesosStatusParser.parseStatus(create.getAllOutput()) : this.yarnStatusParser.parseStatus(create.getAllOutput()));
            create.cleanup();
            return valueOf;
        } catch (Throwable th) {
            create.cleanup();
            throw th;
        }
    }

    @Override // _ss_com.streamsets.datacollector.cluster.ClusterProvider
    public void cleanUp(ApplicationState applicationState, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean) throws IOException {
    }

    @Override // _ss_com.streamsets.datacollector.cluster.BaseClusterProvider
    protected ApplicationState startPipelineExecute(File file, Map<String, String> map, PipelineConfiguration pipelineConfiguration, PipelineConfigBean pipelineConfigBean, long j, File file2, String str, File file3, File file4, Set<String> set, File file5, File file6, File file7, File file8, File file9, String str2, String str3, String str4, List<Issue> list) throws IOException {
        List<String> generateMesosArgs;
        long convert;
        ExecutionMode executionMode = PipelineBeanCreator.get().getExecutionMode(pipelineConfiguration, new ArrayList());
        HashMap hashMap = new HashMap(pipelineConfigBean.clusterLauncherEnv);
        addKerberosConfiguration(hashMap);
        list.clear();
        PipelineConfigBean create = PipelineBeanCreator.get().create(pipelineConfiguration, list, null);
        Utils.checkArgument(create != null, Utils.formatL("Invalid pipeline configuration: {}", new Object[]{list}));
        String valueOf = create.workerCount == 0 ? map.get(ClusterModeConstants.NUM_EXECUTORS_KEY) : String.valueOf(create.workerCount);
        String str5 = map.get("EXTRA_KAFKA_CONFIG_PREFIX_security.protocol");
        boolean z = false;
        if (str5 != null && str5.toUpperCase().contains(ClusterModeConstants.SECURE_KAFKA_IDENTIFIER)) {
            z = true;
        }
        File file10 = null;
        String str6 = create.clusterSlaveJavaOpts + ((String) Optional.ofNullable(System.getProperty(MAPR_UNAME_PWD_SECURITY_ENABLED_KEY)).map(str7 -> {
            return !create.clusterSlaveJavaOpts.contains(MAPR_UNAME_PWD_SECURITY_ENABLED_KEY) ? " -Dmaprlogin.password.enabled=" + str7 : "";
        }).orElse(""));
        getLog().info("Slave Java Opts : {}", str6);
        if (executionMode == ExecutionMode.CLUSTER_BATCH) {
            addProxyUserConfiguration(hashMap, map.get(ClusterModeConstants.HADOOP_PROXY_USER));
            getLog().info("Submitting MapReduce Job");
            hashMap.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_MAPREDUCE);
            generateMesosArgs = generateMRArgs(getClusterManagerScript().getAbsolutePath(), String.valueOf(create.clusterSlaveMemory), str6, file5.getAbsolutePath(), file7.getAbsolutePath(), file6.getAbsolutePath(), file9.getAbsolutePath(), file4.getAbsolutePath(), file8.getAbsolutePath(), file3.getAbsolutePath(), set);
        } else if (executionMode == ExecutionMode.CLUSTER_YARN_STREAMING) {
            getLog().info("Submitting Spark Job on Yarn");
            hashMap.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_YARN);
            generateMesosArgs = generateSparkArgs(getClusterManagerScript().getAbsolutePath(), String.valueOf(create.clusterSlaveMemory), str6, create.sparkConfigs, valueOf, file5.getAbsolutePath(), file7.getAbsolutePath(), file6.getAbsolutePath(), file9.getAbsolutePath(), file4.getAbsolutePath(), set, pipelineConfiguration.getTitle(), str4, z);
        } else {
            if (executionMode != ExecutionMode.CLUSTER_MESOS_STREAMING) {
                throw new IllegalStateException(Utils.format("Incorrect execution mode: {}", new Object[]{executionMode}));
            }
            getLog().info("Submitting Spark Job on Mesos");
            hashMap.put(BaseClusterProvider.CLUSTER_TYPE, BaseClusterProvider.CLUSTER_TYPE_MESOS);
            hashMap.put(STAGING_DIR, file2.getAbsolutePath());
            hashMap.put(MESOS_UBER_JAR_PATH, file3.getAbsolutePath());
            hashMap.put(MESOS_UBER_JAR, file3.getName());
            hashMap.put(ETC_TAR_ARCHIVE, "etc.tar.gz");
            hashMap.put(LIBS_TAR_ARCHIVE, "libs.tar.gz");
            hashMap.put(RESOURCES_TAR_ARCHIVE, "resources.tar.gz");
            file10 = new File(getRuntimeInfo().getDataDir(), (String) Utils.checkNotNull(str2, "mesos jar dir cannot be null"));
            if (!file10.mkdirs()) {
                throw new RuntimeException("Couldn't create hosting dir: " + file10.toString());
            }
            hashMap.put(MESOS_HOSTING_JAR_DIR, file10.getAbsolutePath());
            generateMesosArgs = generateMesosArgs(getClusterManagerScript().getAbsolutePath(), create.mesosDispatcherURL, (String) Utils.checkNotNull(str3, "mesos jar url cannot be null"));
        }
        SystemProcess create2 = getSystemProcessFactory().create(BaseClusterProvider.class.getSimpleName(), file, generateMesosArgs);
        getLog().info("Starting: " + create2);
        try {
            create2.start(hashMap);
            long currentTimeMillis = System.currentTimeMillis();
            HashSet hashSet = new HashSet();
            do {
                convert = TimeUnit.SECONDS.convert(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                getLog().debug("Waiting for application id, elapsed seconds: " + convert);
                if (hashSet.size() > 1) {
                    throw new IllegalStateException(errorString("Found more than one application id: {}", hashSet));
                }
                if (!hashSet.isEmpty()) {
                    String str8 = (String) hashSet.iterator().next();
                    ApplicationState applicationState = new ApplicationState();
                    applicationState.setAppId(str8);
                    applicationState.setSdcToken(str);
                    if (str2 != null) {
                        applicationState.setDirId(str2);
                    }
                    return applicationState;
                }
                if (!ThreadUtil.sleep(1000L)) {
                    if (file10 != null) {
                        FileUtils.deleteQuietly(file10);
                    }
                    throw new IllegalStateException("Interrupted while waiting for pipeline to start");
                }
                ArrayList<String> arrayList = new ArrayList();
                arrayList.addAll(create2.getOutput());
                arrayList.addAll(create2.getError());
                for (String str9 : arrayList) {
                    Matcher matcher = executionMode == ExecutionMode.CLUSTER_MESOS_STREAMING ? MESOS_DRIVER_ID_REGEX.matcher(str9) : YARN_APPLICATION_ID_REGEX.matcher(str9);
                    if (matcher.find()) {
                        getLog().info("Found application id " + matcher.group(1));
                        hashSet.add(matcher.group(1));
                    }
                    Matcher matcher2 = NO_VALID_CREDENTIALS.matcher(str9);
                    if (matcher2.find()) {
                        getLog().info("Kerberos Error found on line: " + str9);
                        throw new IOException("Kerberos Error: " + matcher2.group(1));
                    }
                }
            } while (convert <= j);
            Object[] objArr = new Object[2];
            objArr[0] = Long.valueOf(convert);
            objArr[1] = create2.isAlive() ? "is" : "is not";
            String format = Utils.format("Timed out after waiting {} seconds for for cluster application to start. Submit command {} alive.", objArr);
            if (file10 != null) {
                FileUtils.deleteQuietly(file10);
            }
            Iterator it = hashSet.iterator();
            if (it.hasNext()) {
                getSystemProcessFactory().create(BaseClusterProvider.CLUSTER_TYPE_YARN, Files.createTempDir(), Arrays.asList(ServiceAbbreviations.CloudWatchLogs, "-applicationId", (String) it.next()));
            }
            throw new IllegalStateException(format);
        } finally {
            create2.cleanup();
        }
    }

    private List<String> generateMRArgs(String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, String str9, String str10, Set<String> set) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.add("start");
        arrayList.add(ArchiveStreamFactory.JAR);
        arrayList.add(str10);
        arrayList.add("_ss_com.streamsets.pipeline.BootstrapClusterBatch");
        arrayList.add("-archives");
        arrayList.add(Joiner.on(StringUtils.COMMA_SEPARATOR).join(str4, str5, str6));
        arrayList.add("-D");
        arrayList.add("mapreduce.job.log4j-properties-file=" + str7);
        arrayList.add("-libjars");
        StringBuilder sb = new StringBuilder(str8);
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            sb.append(StringUtils.COMMA_SEPARATOR).append(it.next());
        }
        arrayList.add(sb.toString());
        arrayList.add(str9);
        arrayList.add(Joiner.on(" ").join(String.format("-Xmx%sm", str2), str3, "-javaagent:./" + new File(str8).getName()));
        return arrayList;
    }

    private List<String> generateMesosArgs(String str, String str2, String str3) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.add("start");
        arrayList.add("--deploy-mode");
        arrayList.add("cluster");
        arrayList.add("--total-executor-cores");
        arrayList.add(PipelineConfigBean.DEFAULT_STATS_AGGREGATOR_STAGE_VERSION);
        arrayList.add("--master");
        arrayList.add(str2);
        arrayList.add("--class");
        arrayList.add("_ss_com.streamsets.pipeline.mesos.BootstrapMesosDriver");
        arrayList.add(str3);
        return arrayList;
    }

    private List<String> generateSparkArgs(String str, String str2, String str3, Map<String, String> map, String str4, String str5, String str6, String str7, String str8, String str9, Set<String> set, String str10, String str11, boolean z) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.add("start");
        arrayList.add("--master");
        arrayList.add(BaseClusterProvider.CLUSTER_TYPE_YARN);
        arrayList.add("--deploy-mode");
        arrayList.add("cluster");
        arrayList.add("--executor-memory");
        arrayList.add(str2 + "m");
        arrayList.add("--executor-cores");
        arrayList.add(PipelineConfigBean.DEFAULT_STATS_AGGREGATOR_STAGE_VERSION);
        checkNumExecutors(str4);
        arrayList.add("--num-executors");
        arrayList.add(str4);
        arrayList.add("--archives");
        arrayList.add(Joiner.on(StringUtils.COMMA_SEPARATOR).join(str5, str6, str7));
        arrayList.add("--files");
        arrayList.add(str8);
        arrayList.add("--jars");
        StringBuilder sb = new StringBuilder(str9);
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            sb.append(StringUtils.COMMA_SEPARATOR).append(it.next());
        }
        arrayList.add(sb.toString());
        if (getSecurityConfiguration() != null && getSecurityConfiguration().isKerberosEnabled()) {
            arrayList.add("--keytab");
            arrayList.add(getSecurityConfiguration().getKerberosKeytab());
            arrayList.add("--principal");
            arrayList.add(getSecurityConfiguration().getKerberosPrincipal());
        }
        if (z) {
            String property = System.getProperty(WebServerTask.JAVA_SECURITY_AUTH_LOGIN_CONFIG);
            arrayList.add("--conf");
            arrayList.add(Joiner.on("=").join("spark.driver.extraJavaOptions", "-Djava.security.auth.login.config", property));
            str3 = Utils.format("{} {}={}", new Object[]{str3, "-Djava.security.auth.login.config", property});
        }
        arrayList.add("--conf");
        arrayList.add("spark.executor.extraJavaOptions=" + Joiner.on(" ").join("-javaagent:./" + new File(str9).getName(), str3, new Object[0]));
        map.forEach((str12, str13) -> {
            arrayList.add("--conf");
            arrayList.add(str12 + "=" + str13);
        });
        arrayList.add("--name");
        arrayList.add("StreamSets Data Collector: " + str10);
        arrayList.add("--class");
        arrayList.add("_ss_com.streamsets.pipeline.BootstrapClusterStreaming");
        arrayList.add(str11);
        return arrayList;
    }

    private void checkNumExecutors(String str) {
        Utils.checkNotNull(str, "Number of executors not found");
        try {
            Utils.checkArgument(Integer.parseInt(str) > 0, "Number of executors cannot be less than 1");
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Number of executors is not a valid integer");
        }
    }
}
