package org.apache.gobblin.yarn;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.mail.EmailException;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.rest.JobExecutionInfoServer;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.EmailUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/yarn/GobblinYarnAppLauncher.class */
public class GobblinYarnAppLauncher {
    private static final Logger LOGGER = LoggerFactory.getLogger(GobblinYarnAppLauncher.class);
    private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
    private static final String GOBBLIN_YARN_APPLICATION_TYPE = "GOBBLIN_YARN";
    private static final Set<String> APPLICATION_TYPES = ImmutableSet.of(GOBBLIN_YARN_APPLICATION_TYPE);
    private static final EnumSet<YarnApplicationState> RECONNECTABLE_APPLICATION_STATES = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING);
    private final String applicationName;
    private final String appQueueName;
    private final Config config;
    private final HelixManager helixManager;
    private final Configuration yarnConfiguration;
    private final YarnClient yarnClient;
    private final FileSystem fs;
    private final ScheduledExecutorService applicationStatusMonitor;
    private final long appReportIntervalMinutes;
    private final Optional<String> appMasterJvmArgs;
    private final Path sinkLogRootDir;
    private final int maxGetApplicationReportFailures;
    private final boolean emailNotificationOnShutdown;
    private final EventBus eventBus = new EventBus(GobblinYarnAppLauncher.class.getSimpleName());
    private final Closer closer = Closer.create();
    private volatile Optional<ApplicationId> applicationId = Optional.absent();
    private volatile Optional<ServiceManager> serviceManager = Optional.absent();
    private final AtomicInteger getApplicationReportFailureCount = new AtomicInteger();
    private volatile boolean applicationCompleted = false;
    private volatile boolean stopped = false;

    public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
        this.config = config;
        this.applicationName = config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
        this.appQueueName = config.getString(GobblinYarnConfigurationKeys.APP_QUEUE_KEY);
        String string = config.getString("gobblin.cluster.zk.connection.string");
        LOGGER.info("Using ZooKeeper connection string: " + string);
        this.helixManager = HelixManagerFactory.getZKHelixManager(config.getString("gobblin.cluster.helix.cluster.name"), GobblinClusterUtils.getHostname(), InstanceType.SPECTATOR, string);
        this.yarnConfiguration = yarnConfiguration;
        this.yarnConfiguration.set("fs.automatic.close", "false");
        this.yarnClient = YarnClient.createYarnClient();
        this.yarnClient.init(this.yarnConfiguration);
        this.fs = config.hasPath("fs.uri") ? FileSystem.get(URI.create(config.getString("fs.uri")), this.yarnConfiguration) : FileSystem.get(this.yarnConfiguration);
        this.closer.register(this.fs);
        this.applicationStatusMonitor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("GobblinYarnAppStatusMonitor")));
        this.appReportIntervalMinutes = config.getLong(GobblinYarnConfigurationKeys.APP_REPORT_INTERVAL_MINUTES_KEY);
        this.appMasterJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY) ? Optional.of(config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY)) : Optional.absent();
        this.sinkLogRootDir = new Path(config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY));
        this.maxGetApplicationReportFailures = config.getInt(GobblinYarnConfigurationKeys.MAX_GET_APP_REPORT_FAILURES_KEY);
        this.emailNotificationOnShutdown = config.getBoolean(GobblinYarnConfigurationKeys.EMAIL_NOTIFICATION_ON_SHUTDOWN_KEY);
    }

    public void launch() throws IOException, YarnException {
        this.eventBus.register(this);
        String string = this.config.getString("gobblin.cluster.helix.cluster.name");
        HelixUtils.createGobblinHelixCluster(this.config.getString("gobblin.cluster.zk.connection.string"), string);
        LOGGER.info("Created Helix cluster " + string);
        connectHelixManager();
        startYarnClient();
        this.applicationId = getApplicationId();
        this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.gobblin.yarn.GobblinYarnAppLauncher.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    GobblinYarnAppLauncher.this.eventBus.post(new ApplicationReportArrivalEvent(GobblinYarnAppLauncher.this.yarnClient.getApplicationReport((ApplicationId) GobblinYarnAppLauncher.this.applicationId.get())));
                } catch (YarnException | IOException e) {
                    GobblinYarnAppLauncher.LOGGER.error("Failed to get application report for Gobblin Yarn application " + GobblinYarnAppLauncher.this.applicationId.get(), e);
                    GobblinYarnAppLauncher.this.eventBus.post(new GetApplicationReportFailureEvent(e));
                }
            }
        }, 0L, this.appReportIntervalMinutes, TimeUnit.MINUTES);
        ArrayList newArrayList = Lists.newArrayList();
        if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
            LOGGER.info("Adding YarnAppSecurityManager since login is keytab based");
            newArrayList.add(buildYarnAppSecurityManager());
        }
        if (!this.config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY) || !this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY)) {
            newArrayList.add(buildLogCopier(this.config, new Path(this.sinkLogRootDir, this.applicationName + "/" + ((ApplicationId) this.applicationId.get()).toString()), GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName, ((ApplicationId) this.applicationId.get()).toString())));
        }
        if (this.config.getBoolean("job.execinfo.server.enabled")) {
            LOGGER.info("Starting the job execution info server since it is enabled");
            Properties configToProperties = ConfigUtils.configToProperties(this.config);
            JobExecutionInfoServer jobExecutionInfoServer = new JobExecutionInfoServer(configToProperties);
            newArrayList.add(jobExecutionInfoServer);
            if (this.config.getBoolean("admin.server.enabled")) {
                LOGGER.info("Starting the admin UI server since it is enabled");
                newArrayList.add(ServiceBasedAppLauncher.createAdminServer(configToProperties, jobExecutionInfoServer.getAdvertisedServerUri()));
            }
        } else if (this.config.getBoolean("admin.server.enabled")) {
            LOGGER.warn("NOT starting the admin UI because the job execution info server is NOT enabled");
        }
        this.serviceManager = Optional.of(new ServiceManager(newArrayList));
        ((ServiceManager) this.serviceManager.get()).startAsync();
    }

    public synchronized void stop() throws IOException, TimeoutException {
        if (this.stopped) {
            return;
        }
        LOGGER.info("Stopping the " + GobblinYarnAppLauncher.class.getSimpleName());
        try {
            if (this.applicationId.isPresent() && !this.applicationCompleted) {
                sendShutdownRequest();
            }
            if (this.serviceManager.isPresent()) {
                ((ServiceManager) this.serviceManager.get()).stopAsync().awaitStopped(5L, TimeUnit.MINUTES);
            }
            ExecutorsUtils.shutdownExecutorService(this.applicationStatusMonitor, Optional.of(LOGGER), 5L, TimeUnit.MINUTES);
            stopYarnClient();
            disconnectHelixManager();
            try {
                if (this.applicationId.isPresent()) {
                    cleanUpAppWorkDirectory((ApplicationId) this.applicationId.get());
                }
                this.stopped = true;
            } finally {
            }
        } catch (Throwable th) {
            try {
                if (this.applicationId.isPresent()) {
                    cleanUpAppWorkDirectory((ApplicationId) this.applicationId.get());
                }
                throw th;
            } finally {
            }
        }
    }

    @Subscribe
    public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent applicationReportArrivalEvent) {
        ApplicationReport applicationReport = applicationReportArrivalEvent.getApplicationReport();
        YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
        LOGGER.info("Gobblin Yarn application state: " + yarnApplicationState.toString());
        this.getApplicationReportFailureCount.set(0);
        if (yarnApplicationState == YarnApplicationState.FINISHED || yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
            this.applicationCompleted = true;
            LOGGER.info("Gobblin Yarn application finished with final status: " + applicationReport.getFinalApplicationStatus().toString());
            if (applicationReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED) {
                LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics());
            }
            try {
                try {
                    stop();
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.of(applicationReport));
                    }
                } catch (IOException e) {
                    LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), e);
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.of(applicationReport));
                    }
                } catch (TimeoutException e2) {
                    LOGGER.error("Timeout in stopping the service manager", e2);
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.of(applicationReport));
                    }
                }
            } catch (Throwable th) {
                if (this.emailNotificationOnShutdown) {
                    sendEmailOnShutdown(Optional.of(applicationReport));
                }
                throw th;
            }
        }
    }

    @Subscribe
    public void handleGetApplicationReportFailureEvent(GetApplicationReportFailureEvent getApplicationReportFailureEvent) {
        int incrementAndGet = this.getApplicationReportFailureCount.incrementAndGet();
        if (incrementAndGet > this.maxGetApplicationReportFailures) {
            LOGGER.warn(String.format("Number of consecutive failures to get the ApplicationReport %d exceeds the threshold %d", Integer.valueOf(incrementAndGet), Integer.valueOf(this.maxGetApplicationReportFailures)));
            try {
                try {
                    stop();
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.absent());
                    }
                } catch (IOException e) {
                    LOGGER.error("Failed to close the " + GobblinYarnAppLauncher.class.getSimpleName(), e);
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.absent());
                    }
                } catch (TimeoutException e2) {
                    LOGGER.error("Timeout in stopping the service manager", e2);
                    if (this.emailNotificationOnShutdown) {
                        sendEmailOnShutdown(Optional.absent());
                    }
                }
            } catch (Throwable th) {
                if (this.emailNotificationOnShutdown) {
                    sendEmailOnShutdown(Optional.absent());
                }
                throw th;
            }
        }
    }

    @VisibleForTesting
    void connectHelixManager() {
        try {
            this.helixManager.connect();
        } catch (Exception e) {
            LOGGER.error("HelixManager failed to connect", e);
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    void disconnectHelixManager() {
        if (this.helixManager.isConnected()) {
            this.helixManager.disconnect();
        }
    }

    @VisibleForTesting
    void startYarnClient() {
        this.yarnClient.start();
    }

    @VisibleForTesting
    void stopYarnClient() {
        this.yarnClient.stop();
    }

    private Optional<ApplicationId> getApplicationId() throws YarnException, IOException {
        Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
        if (reconnectableApplicationId.isPresent()) {
            LOGGER.info("Found reconnectable application with application ID: " + reconnectableApplicationId.get());
            return reconnectableApplicationId;
        }
        LOGGER.info("No reconnectable application found so submitting a new application");
        return Optional.of(setupAndSubmitApplication());
    }

    @VisibleForTesting
    Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
        List<ApplicationReport> applications = this.yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
        if (applications == null || applications.isEmpty()) {
            return Optional.absent();
        }
        for (ApplicationReport applicationReport : applications) {
            if (this.applicationName.equals(applicationReport.getName())) {
                return Optional.of(applicationReport.getApplicationId());
            }
        }
        return Optional.absent();
    }

    @VisibleForTesting
    ApplicationId setupAndSubmitApplication() throws IOException, YarnException {
        YarnClientApplication createApplication = this.yarnClient.createApplication();
        ApplicationSubmissionContext applicationSubmissionContext = createApplication.getApplicationSubmissionContext();
        applicationSubmissionContext.setApplicationType(GOBBLIN_YARN_APPLICATION_TYPE);
        ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
        Resource prepareContainerResource = prepareContainerResource(createApplication.getNewApplicationResponse());
        Map<String, LocalResource> addAppMasterLocalResources = addAppMasterLocalResources(applicationId);
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        containerLaunchContext.setLocalResources(addAppMasterLocalResources);
        containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
        containerLaunchContext.setCommands(Lists.newArrayList(new String[]{buildApplicationMasterCommand(prepareContainerResource.getMemory())}));
        if (UserGroupInformation.isSecurityEnabled()) {
            setupSecurityTokens(containerLaunchContext);
        }
        applicationSubmissionContext.setApplicationName(this.applicationName);
        applicationSubmissionContext.setResource(prepareContainerResource);
        applicationSubmissionContext.setQueue(this.appQueueName);
        applicationSubmissionContext.setPriority(Priority.newInstance(0));
        applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
        addContainerLocalResources(applicationId);
        LOGGER.info("Submitting application " + applicationId);
        this.yarnClient.submitApplication(applicationSubmissionContext);
        LOGGER.info("Application successfully submitted and accepted");
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(applicationId);
        LOGGER.info("Application Name: " + applicationReport.getName());
        LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
        LOGGER.info("Application User: " + applicationReport.getUser() + " Queue: " + applicationReport.getQueue());
        return applicationId;
    }

    private Resource prepareContainerResource(GetNewApplicationResponse getNewApplicationResponse) {
        int i = this.config.getInt(GobblinYarnConfigurationKeys.APP_MASTER_MEMORY_MBS_KEY);
        int memory = getNewApplicationResponse.getMaximumResourceCapability().getMemory();
        if (i > memory) {
            LOGGER.info(String.format("Specified AM memory [%d] is above the maximum memory capacity [%d] of the cluster, using the maximum memory capacity instead.", Integer.valueOf(i), Integer.valueOf(memory)));
            i = memory;
        }
        int i2 = this.config.getInt(GobblinYarnConfigurationKeys.APP_MASTER_CORES_KEY);
        int virtualCores = getNewApplicationResponse.getMaximumResourceCapability().getVirtualCores();
        if (i2 > virtualCores) {
            LOGGER.info(String.format("Specified AM vcores [%d] is above the maximum vcore capacity [%d] of the cluster, using the maximum vcore capacity instead.", Integer.valueOf(i), Integer.valueOf(memory)));
            i2 = virtualCores;
        }
        return Resource.newInstance(i, i2);
    }

    private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId applicationId) throws IOException {
        Path appWorkDirPath = GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName, applicationId.toString());
        Path path = new Path(appWorkDirPath, GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
        HashMap newHashMap = Maps.newHashMap();
        if (this.config.hasPath(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)) {
            addLibJars(new Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)), Optional.of(newHashMap), new Path(appWorkDirPath, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME));
        }
        if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY)) {
            addAppJars(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY), Optional.of(newHashMap), new Path(path, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME));
        }
        if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY)) {
            addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_LOCAL_KEY), Optional.of(newHashMap), new Path(path, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME));
        }
        if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY)) {
            addAppRemoteFiles(this.config.getString(GobblinYarnConfigurationKeys.APP_MASTER_FILES_REMOTE_KEY), newHashMap);
        }
        if (this.config.hasPath("gobblin.cluster.job.conf.path")) {
            addJobConfPackage(this.config.getString("gobblin.cluster.job.conf.path"), new Path(path, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), newHashMap);
        }
        return newHashMap;
    }

    private void addContainerLocalResources(ApplicationId applicationId) throws IOException {
        Path path = new Path(GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName, applicationId.toString()), GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
        if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
            addAppJars(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY), Optional.absent(), new Path(path, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME));
        }
        if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) {
            addAppLocalFiles(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY), Optional.absent(), new Path(path, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME));
        }
    }

    private void addLibJars(Path path, Optional<Map<String, LocalResource>> optional, Path path2) throws IOException {
        FileStatus[] listStatus = FileSystem.getLocal(this.yarnConfiguration).listStatus(path);
        if (listStatus == null || listStatus.length == 0) {
            return;
        }
        for (FileStatus fileStatus : listStatus) {
            Path path3 = new Path(path2, fileStatus.getPath().getName());
            this.fs.copyFromLocalFile(fileStatus.getPath(), path3);
            if (optional.isPresent()) {
                YarnHelixUtils.addFileAsLocalResource(this.fs, path3, LocalResourceType.FILE, (Map) optional.get());
            }
        }
    }

    private void addAppJars(String str, Optional<Map<String, LocalResource>> optional, Path path) throws IOException {
        Iterator it = SPLITTER.split(str).iterator();
        while (it.hasNext()) {
            Path path2 = new Path((String) it.next());
            Path path3 = new Path(path, path2.getName());
            this.fs.copyFromLocalFile(path2, path3);
            if (optional.isPresent()) {
                YarnHelixUtils.addFileAsLocalResource(this.fs, path3, LocalResourceType.FILE, (Map) optional.get());
            }
        }
    }

    private void addAppLocalFiles(String str, Optional<Map<String, LocalResource>> optional, Path path) throws IOException {
        Iterator it = SPLITTER.split(str).iterator();
        while (it.hasNext()) {
            Path path2 = new Path((String) it.next());
            Path path3 = new Path(path, path2.getName());
            this.fs.copyFromLocalFile(path2, path3);
            if (optional.isPresent()) {
                YarnHelixUtils.addFileAsLocalResource(this.fs, path3, LocalResourceType.FILE, (Map) optional.get());
            }
        }
    }

    private void addAppRemoteFiles(String str, Map<String, LocalResource> map) throws IOException {
        Iterator it = SPLITTER.split(str).iterator();
        while (it.hasNext()) {
            YarnHelixUtils.addFileAsLocalResource(this.fs, new Path((String) it.next()), LocalResourceType.FILE, map);
        }
    }

    private void addJobConfPackage(String str, Path path, Map<String, LocalResource> map) throws IOException {
        Path path2 = new Path(str);
        Path path3 = new Path(path, path2.getName() + ".tar.gz");
        StreamUtils.tar(FileSystem.getLocal(this.yarnConfiguration), this.fs, path2, path3);
        YarnHelixUtils.addFileAsLocalResource(this.fs, path3, LocalResourceType.ARCHIVE, map);
    }

    private String buildApplicationMasterCommand(int i) {
        String simpleName = GobblinApplicationMaster.class.getSimpleName();
        return ApplicationConstants.Environment.JAVA_HOME.$() + "/bin/java -Xmx" + i + "M " + JvmUtils.formatJvmArguments(this.appMasterJvmArgs) + " " + GobblinApplicationMaster.class.getName() + " --app_name " + this.applicationName + " 1><LOG_DIR>" + File.separator + simpleName + ".stdout 2><LOG_DIR>" + File.separator + simpleName + ".stderr";
    }

    private void setupSecurityTokens(ContainerLaunchContext containerLaunchContext) throws IOException {
        RuntimeException rethrow;
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        String str = this.yarnConfiguration.get("yarn.resourcemanager.principal");
        if (str == null || str.length() == 0) {
            throw new IOException("Failed to get master Kerberos principal for the RM to use as renewer");
        }
        Token[] addDelegationTokens = this.fs.addDelegationTokens(str, credentials);
        if (addDelegationTokens != null) {
            for (Token token : addDelegationTokens) {
                LOGGER.info("Got delegation token for " + this.fs.getUri() + "; " + token);
            }
        }
        Closer create = Closer.create();
        try {
            try {
                DataOutputBuffer register = create.register(new DataOutputBuffer());
                credentials.writeTokenStorageToStream(register);
                containerLaunchContext.setTokens(ByteBuffer.wrap(register.getData(), 0, register.getLength()));
                create.close();
            } finally {
            }
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    private LogCopier buildLogCopier(Config config, Path path, Path path2) throws IOException {
        FileSystem register = this.closer.register(new RawLocalFileSystem());
        register.initialize(URI.create("file:///"), new Configuration());
        LogCopier.Builder acceptsLogFileExtensions = LogCopier.newBuilder().useSrcFileSystem(this.fs).useDestFileSystem(register).readFrom(getHdfsLogDir(path2)).writeTo(path).acceptsLogFileExtensions(ImmutableSet.of("stdout", "stderr"));
        if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE)) {
            acceptsLogFileExtensions.useMaxBytesPerLogFile(config.getBytes(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE).longValue());
        }
        if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER)) {
            acceptsLogFileExtensions.useScheduler(config.getString(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER));
        }
        return acceptsLogFileExtensions.build();
    }

    private Path getHdfsLogDir(Path path) throws IOException {
        Path path2 = new Path(path, GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME);
        if (!this.fs.exists(path2)) {
            this.fs.mkdirs(path2);
        }
        return path2;
    }

    private YarnAppSecurityManager buildYarnAppSecurityManager() throws IOException {
        return new YarnAppSecurityManager(this.config, this.helixManager, this.fs, new Path(this.fs.getHomeDirectory(), this.applicationName + "/" + GobblinYarnConfigurationKeys.TOKEN_FILE_NAME));
    }

    @VisibleForTesting
    void sendShutdownRequest() {
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setResource("%");
        criteria.setPartition("%");
        criteria.setPartitionState("%");
        criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
        criteria.setSessionSpecific(true);
        Message message = new Message("SHUTDOWN", HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString().toLowerCase() + UUID.randomUUID().toString());
        message.setMsgSubType(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString());
        message.setMsgState(Message.MessageState.NEW);
        message.setTgtSessionId("*");
        if (this.helixManager.getMessagingService().send(criteria, message) == 0) {
            LOGGER.error(String.format("Failed to send the %s message to the controller", message.getMsgSubType()));
        }
    }

    @VisibleForTesting
    void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException {
        Path appWorkDirPath = GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName, applicationId.toString());
        if (this.fs.exists(appWorkDirPath)) {
            LOGGER.info("Deleting application working directory " + appWorkDirPath);
            this.fs.delete(appWorkDirPath, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEmailOnShutdown(Optional<ApplicationReport> optional) {
        String format = String.format("Gobblin Yarn application %s completed", this.applicationName);
        StringBuilder sb = new StringBuilder("Gobblin Yarn ApplicationReport:");
        if (optional.isPresent()) {
            sb.append("\n");
            sb.append("\tApplication ID: ").append(((ApplicationReport) optional.get()).getApplicationId()).append("\n");
            sb.append("\tApplication attempt ID: ").append(((ApplicationReport) optional.get()).getCurrentApplicationAttemptId()).append("\n");
            sb.append("\tFinal application status: ").append(((ApplicationReport) optional.get()).getFinalApplicationStatus()).append("\n");
            sb.append("\tStart time: ").append(((ApplicationReport) optional.get()).getStartTime()).append("\n");
            sb.append("\tFinish time: ").append(((ApplicationReport) optional.get()).getFinishTime()).append("\n");
            if (!Strings.isNullOrEmpty(((ApplicationReport) optional.get()).getDiagnostics())) {
                sb.append("\tDiagnostics: ").append(((ApplicationReport) optional.get()).getDiagnostics()).append("\n");
            }
            ApplicationResourceUsageReport applicationResourceUsageReport = ((ApplicationReport) optional.get()).getApplicationResourceUsageReport();
            if (applicationResourceUsageReport != null) {
                sb.append("\tUsed containers: ").append(applicationResourceUsageReport.getNumUsedContainers()).append("\n");
                Resource usedResources = applicationResourceUsageReport.getUsedResources();
                if (usedResources != null) {
                    sb.append("\tUsed memory (MBs): ").append(usedResources.getMemory()).append("\n");
                    sb.append("\tUsed vcores: ").append(usedResources.getVirtualCores()).append("\n");
                }
            }
        } else {
            sb.append(' ').append("Not available");
        }
        try {
            EmailUtils.sendEmail(ConfigUtils.configToState(this.config), format, sb.toString());
        } catch (EmailException e) {
            LOGGER.error("Failed to send email notification on shutdown", e);
        }
    }

    public static void main(String[] strArr) throws Exception {
        GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher(ConfigFactory.load(), new YarnConfiguration());
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.gobblin.yarn.GobblinYarnAppLauncher.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        GobblinYarnAppLauncher.this.stop();
                        if (GobblinYarnAppLauncher.this.emailNotificationOnShutdown) {
                            GobblinYarnAppLauncher.this.sendEmailOnShutdown(Optional.absent());
                        }
                    } catch (IOException e) {
                        GobblinYarnAppLauncher.LOGGER.error("Failed to shutdown the " + GobblinYarnAppLauncher.class.getSimpleName(), e);
                        if (GobblinYarnAppLauncher.this.emailNotificationOnShutdown) {
                            GobblinYarnAppLauncher.this.sendEmailOnShutdown(Optional.absent());
                        }
                    } catch (TimeoutException e2) {
                        GobblinYarnAppLauncher.LOGGER.error("Timeout in stopping the service manager", e2);
                        if (GobblinYarnAppLauncher.this.emailNotificationOnShutdown) {
                            GobblinYarnAppLauncher.this.sendEmailOnShutdown(Optional.absent());
                        }
                    }
                } catch (Throwable th) {
                    if (GobblinYarnAppLauncher.this.emailNotificationOnShutdown) {
                        GobblinYarnAppLauncher.this.sendEmailOnShutdown(Optional.absent());
                    }
                    throw th;
                }
            }
        });
        gobblinYarnAppLauncher.launch();
    }
}
