/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.mapreduce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.JobStateEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
import org.apache.gobblin.runtime.mapreduce.CustomizedProgresser;
import org.apache.gobblin.runtime.mapreduce.GobblinOutputCommitter;
import org.apache.gobblin.runtime.mapreduce.GobblinOutputFormat;
import org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat;
import org.apache.gobblin.runtime.mapreduce.MRTaskStateTracker;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.MetricGroup;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MRJobLauncher
extends AbstractJobLauncher {
    private static final Logger log = LoggerFactory.getLogger(MRJobLauncher.class);
    private static final String INTERRUPT_JOB_FILE_NAME = "_INTERRUPT_JOB";
    private static final String GOBBLIN_JOB_INTERRUPT_PATH_KEY = "gobblin.jobInterruptPath";
    private static final Logger LOG = LoggerFactory.getLogger(MRJobLauncher.class);
    private static final String JOB_NAME_PREFIX = "Gobblin-";
    private static final String JARS_DIR_NAME = "_jars";
    private static final String FILES_DIR_NAME = "_files";
    static final String INPUT_DIR_NAME = "input";
    private static final String OUTPUT_DIR_NAME = "output";
    private static final String SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY = "MRJobLauncher.serializePreviousWorkunitStates";
    private static final boolean DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES = true;
    private static final String ENABLED_CUSTOMIZED_PROGRESS = "MRJobLauncher.enabledCustomizedProgress";
    private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
    private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
    public static final String MR_TYPE_KEY = "metrics.mr.type";
    public static final String MAPPER_TASK_NUM_KEY = "metrics.reporting.mapper.task.num";
    public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = "metrics.reporting.mapper.task.attempt.num";
    public static final String REDUCER_TASK_NUM_KEY = "metrics.reporting.reducer.task.num";
    public static final String REDUCER_TASK_ATTEMPT_NUM_KEY = "metrics.reporting.reducer.task.attempt.num";
    private static final Splitter SPLITTER = Splitter.on((char)',').omitEmptyStrings().trimResults();
    private final Configuration conf;
    private final FileSystem fs;
    private final Job job;
    private final Path mrJobDir;
    private final Path jarsDir;
    private final Path unsharedJarsDir;
    private final Path jobInputPath;
    private final Path jobOutputPath;
    private final int parallelRunnerThreads;
    private final TaskStateCollectorService taskStateCollectorService;
    private volatile boolean hadoopJobSubmitted = false;
    private final StateStore<TaskState> taskStateStore;
    private final int jarFileMaximumRetry;
    private final Path interruptPath;
    private final GobblinJobFiniteStateMachine fsm = GobblinJobFiniteStateMachine.builder().jobState(this.jobContext.getJobState()).interruptGracefully(this::interruptGracefully).killJob(this::killJob).build();

    public MRJobLauncher(Properties jobProps) throws Exception {
        this(jobProps, (SharedResourcesBroker<GobblinScopeTypes>)null);
    }

    public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
        this(jobProps, new Configuration(), instanceBroker);
    }

    public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
        this(jobProps, conf, instanceBroker, (List<? extends Tag<?>>)ImmutableList.of());
    }

    public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception {
        this(jobProps, new Configuration(), instanceBroker, metadataTags);
    }

    public MRJobLauncher(Properties jobProps, Configuration conf, SharedResourcesBroker<GobblinScopeTypes> instanceBroker, List<? extends Tag<?>> metadataTags) throws Exception {
        super(jobProps, metadataTags);
        this.conf = conf;
        JobConfigurationUtils.putPropertiesIntoConfiguration((Properties)this.jobProps, (Configuration)this.conf);
        this.conf.set("mapreduce.map.failures.maxpercent", "100");
        this.conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        this.fs = MRJobLauncher.buildFileSystem(jobProps, this.conf);
        this.mrJobDir = new Path(new Path(this.jobProps.getProperty("mr.job.root.dir"), this.jobContext.getJobName()), this.jobContext.getJobId());
        this.interruptPath = new Path(this.mrJobDir, INTERRUPT_JOB_FILE_NAME);
        if (this.fs.exists(this.mrJobDir)) {
            LOG.warn("Job working directory already exists for job " + this.jobContext.getJobName());
            this.fs.delete(this.mrJobDir, true);
        }
        this.unsharedJarsDir = new Path(this.mrJobDir, JARS_DIR_NAME);
        this.jarsDir = this.jobProps.containsKey("mr.jars.dir") ? new Path(this.jobProps.getProperty("mr.jars.dir")) : this.unsharedJarsDir;
        this.fs.mkdirs(this.mrJobDir);
        this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
        this.jobOutputPath = new Path(this.mrJobDir, OUTPUT_DIR_NAME);
        Path outputTaskStateDir = new Path(this.jobOutputPath, this.jobContext.getJobId());
        this.job = Job.getInstance((Configuration)this.conf, (String)(JOB_NAME_PREFIX + this.jobContext.getJobName()));
        this.parallelRunnerThreads = Integer.parseInt(jobProps.getProperty("parallel.runner.threads", Integer.toString(10)));
        this.taskStateStore = new FsStateStore(this.fs, this.jobOutputPath.toString(), TaskState.class);
        this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.taskStateStore, outputTaskStateDir);
        this.jarFileMaximumRetry = jobProps.containsKey("job.jars.uploading.retry.maximum") ? Integer.parseInt(jobProps.getProperty("job.jars.uploading.retry.maximum")) : 5;
        log.info("Configured fs:{}", (Object)this.fs);
        log.debug("Configuration: {}", (Object)conf);
        this.startCancellationExecutor();
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.hadoopJobSubmitted && !this.job.isComplete()) {
                LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
                this.job.killJob();
            }
        }
        finally {
            try {
                this.cleanUpWorkingDirectory();
            }
            finally {
                super.close();
                this.fs.close();
            }
        }
    }

    @Override
    protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
        String jobName;
        block33: {
            jobName = this.jobContext.getJobName();
            JobState jobState = this.jobContext.getJobState();
            CountEventBuilder countEventBuilder = new CountEventBuilder("WorkUnitsCreated", (long)workUnits.size());
            this.eventSubmitter.submit((GobblinEventBuilder)countEventBuilder);
            LOG.info("Emitting WorkUnitsCreated Count: " + countEventBuilder.getCount());
            this.prepareHadoopJob(workUnits);
            this.taskStateCollectorService.startAsync().awaitRunning();
            LOG.info("Launching Hadoop MR job " + this.job.getJobName());
            try (FiniteStateMachine.Transition t = this.fsm.startTransition(this.fsm.getEndStateForType(GobblinJobFiniteStateMachine.StateType.RUNNING));){
                try {
                    this.job.submit();
                }
                catch (Throwable exc) {
                    t.changeEndState((Object)this.fsm.getEndStateForType(GobblinJobFiniteStateMachine.StateType.FAILED));
                    throw exc;
                }
                this.hadoopJobSubmitted = true;
                if (!jobState.contains("job.tracking.url")) {
                    jobState.setProp("job.tracking.url", this.job.getTrackingURL());
                }
            }
            catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
                LOG.error("Cannot start MR job.", (Throwable)unallowed);
            }
            if (((GobblinJobFiniteStateMachine.JobFSMState)this.fsm.getCurrentState()).getStateType().equals((Object)GobblinJobFiniteStateMachine.StateType.RUNNING)) {
                TimingEvent mrJobRunTimer = this.eventSubmitter.getTimingEvent("JobMrRunTimer");
                LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
                this.job.waitForCompletion(true);
                this.fsm.transitionIfAllowed(this.fsm.getEndStateForType(GobblinJobFiniteStateMachine.StateType.SUCCESS));
                mrJobRunTimer.stop((Map)ImmutableMap.of((Object)"hadoopMRJobId", (Object)this.job.getJobID().toString()));
            }
            if (!((GobblinJobFiniteStateMachine.JobFSMState)this.fsm.getCurrentState()).getStateType().equals((Object)GobblinJobFiniteStateMachine.StateType.CANCELLED)) break block33;
            JobStateEventBuilder eventBuilder = new JobStateEventBuilder("MRJobState");
            if (!this.hadoopJobSubmitted) {
                eventBuilder.jobTrackingURL = "";
                eventBuilder.status = JobStateEventBuilder.Status.FAILED;
            } else {
                eventBuilder.jobTrackingURL = this.job.getTrackingURL();
                eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
                if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
                    eventBuilder.status = JobStateEventBuilder.Status.FAILED;
                }
            }
            this.eventSubmitter.submit((GobblinEventBuilder)eventBuilder);
            this.taskStateCollectorService.stopAsync().awaitTerminated();
            this.cleanUpWorkingDirectory();
            return;
        }
        try {
            this.countersToMetrics(JobMetrics.get(jobName, this.jobProps.getProperty("job.id")));
        }
        catch (Throwable t) {
            try {
                throw new RuntimeException("The MR job cannot be submitted due to:", t);
            }
            catch (Throwable throwable) {
                JobStateEventBuilder eventBuilder = new JobStateEventBuilder("MRJobState");
                if (!this.hadoopJobSubmitted) {
                    eventBuilder.jobTrackingURL = "";
                    eventBuilder.status = JobStateEventBuilder.Status.FAILED;
                } else {
                    eventBuilder.jobTrackingURL = this.job.getTrackingURL();
                    eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
                    if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
                        eventBuilder.status = JobStateEventBuilder.Status.FAILED;
                    }
                }
                this.eventSubmitter.submit((GobblinEventBuilder)eventBuilder);
                this.taskStateCollectorService.stopAsync().awaitTerminated();
                this.cleanUpWorkingDirectory();
                throw throwable;
            }
        }
        JobStateEventBuilder eventBuilder = new JobStateEventBuilder("MRJobState");
        if (!this.hadoopJobSubmitted) {
            eventBuilder.jobTrackingURL = "";
            eventBuilder.status = JobStateEventBuilder.Status.FAILED;
        } else {
            eventBuilder.jobTrackingURL = this.job.getTrackingURL();
            eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
            if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
                eventBuilder.status = JobStateEventBuilder.Status.FAILED;
            }
        }
        this.eventSubmitter.submit((GobblinEventBuilder)eventBuilder);
        this.taskStateCollectorService.stopAsync().awaitTerminated();
        this.cleanUpWorkingDirectory();
    }

    @Override
    protected void executeCancellation() {
        try (FiniteStateMachine.Transition transition = this.fsm.startTransition(this.fsm.getEndStateForType(GobblinJobFiniteStateMachine.StateType.CANCELLED));){
            if (((GobblinJobFiniteStateMachine.JobFSMState)transition.getStartState()).getStateType().equals((Object)GobblinJobFiniteStateMachine.StateType.RUNNING)) {
                try {
                    this.killJob();
                }
                catch (IOException ioe) {
                    LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
                    transition.changeEndState((Object)this.fsm.getEndStateForType(GobblinJobFiniteStateMachine.StateType.FAILED));
                }
            }
        }
        catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
            exc.getTransition().switchEndStateToErrorState();
            exc.getTransition().closeWithoutCallbacks();
        }
        catch (InterruptedException | FiniteStateMachine.UnallowedTransitionException exc) {
            LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
        }
    }

    private void interruptGracefully() throws IOException {
        LOG.info("Attempting graceful interruption of job " + this.jobContext.getJobId());
        this.fs.createNewFile(this.interruptPath);
        long waitTimeStart = System.currentTimeMillis();
        while (!this.job.isComplete() && System.currentTimeMillis() < waitTimeStart + 30000L) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                // empty catch block
                break;
            }
        }
        if (!this.job.isComplete()) {
            LOG.info("Interrupted job did not shut itself down after timeout. Killing job.");
            this.job.killJob();
        }
    }

    private void killJob() throws IOException {
        LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
        this.job.killJob();
        this.taskStateCollectorService.stopAsync().awaitTerminated();
    }

    private void addDependencies(Configuration conf) throws IOException {
        TimingEvent distributedCacheSetupTimer = this.eventSubmitter.getTimingEvent("JobMrDistributedCacheSetupTimer");
        Path jarFileDir = this.jarsDir;
        if (this.jobProps.containsKey("framework.jars")) {
            this.addJars(jarFileDir, this.jobProps.getProperty("framework.jars"), conf);
        }
        if (this.jobProps.containsKey("job.jars")) {
            this.addJars(jarFileDir, this.jobProps.getProperty("job.jars"), conf);
        }
        if (this.jobProps.containsKey("job.local.files")) {
            this.addLocalFiles(new Path(this.mrJobDir, FILES_DIR_NAME), this.jobProps.getProperty("job.local.files"), conf);
        }
        if (this.jobProps.containsKey("job.hdfs.files")) {
            this.addHDFSFiles(this.jobProps.getProperty("job.hdfs.files"), conf);
        }
        if (this.jobProps.containsKey("job.hdfs.jars")) {
            this.addHdfsJars(this.jobProps.getProperty("job.hdfs.jars"), conf);
        }
        distributedCacheSetupTimer.stop();
    }

    private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException {
        TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent("JobMrSetupTimer");
        this.addDependencies(this.job.getConfiguration());
        this.job.setJarByClass(MRJobLauncher.class);
        this.job.setMapperClass(TaskRunner.class);
        this.job.setNumReduceTasks(0);
        this.job.setInputFormatClass(GobblinWorkUnitsInputFormat.class);
        this.job.setOutputFormatClass(GobblinOutputFormat.class);
        this.job.setMapOutputKeyClass(NullWritable.class);
        this.job.setMapOutputValueClass(NullWritable.class);
        this.job.setSpeculativeExecution(MRJobLauncher.isSpeculativeExecutionEnabled(this.jobProps));
        this.job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
        this.prepareJobInput(workUnits);
        FileInputFormat.addInputPath((Job)this.job, (Path)this.jobInputPath);
        FileOutputFormat.setOutputPath((Job)this.job, (Path)this.jobOutputPath);
        MRJobLauncher.serializeJobState(this.fs, this.mrJobDir, this.conf, this.jobContext.getJobState(), this.job);
        if (this.jobProps.containsKey("mr.job.max.mappers")) {
            GobblinWorkUnitsInputFormat.setMaxMappers(this.job, Integer.parseInt(this.jobProps.getProperty("mr.job.max.mappers")));
        }
        this.job.getConfiguration().set(GOBBLIN_JOB_INTERRUPT_PATH_KEY, this.interruptPath.toString());
        mrJobSetupTimer.stop();
    }

    static boolean isSpeculativeExecutionEnabled(Properties props) {
        return Boolean.valueOf(props.getProperty("mapreduce.map.speculative", "false"));
    }

    static boolean isCustomizedProgressReportEnabled(Properties properties) {
        return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS) && Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
    }

    @VisibleForTesting
    static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration conf, JobState jobState, Job job) throws IOException {
        Path jobStateFilePath = new Path(mrJobDir, "job.state");
        try (DataOutputStream dataOutputStream = new DataOutputStream((OutputStream)fs.create(jobStateFilePath));){
            jobState.write(dataOutputStream, false, conf.getBoolean(SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY, true));
        }
        job.getConfiguration().set("job.state.file.path", jobStateFilePath.toString());
        DistributedCache.addCacheFile((URI)jobStateFilePath.toUri(), (Configuration)job.getConfiguration());
        job.getConfiguration().set("job.state.distributed.cache.name", jobStateFilePath.getName());
    }

    private void addJars(Path jarFileDir, String jarFileList, Configuration conf) throws IOException {
        LocalFileSystem lfs = FileSystem.getLocal((Configuration)conf);
        for (String jarFile : SPLITTER.split((CharSequence)jarFileList)) {
            FileStatus[] fileStatusList;
            Path srcJarFile = new Path(jarFile);
            for (FileStatus status : fileStatusList = lfs.globStatus(srcJarFile)) {
                int retryCount = 0;
                boolean shouldFileBeAddedIntoDC = true;
                Path destJarFile = this.calculateDestJarFile(status, jarFileDir);
                while (!this.fs.exists(destJarFile) || this.fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
                    try {
                        if (this.fs.exists(destJarFile) && this.fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
                            Thread.sleep(3000L);
                            throw new IOException("Waiting for file to complete on uploading ... ");
                        }
                        this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
                    }
                    catch (IOException | InterruptedException e) {
                        LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
                        if (++retryCount < this.jarFileMaximumRetry) continue;
                        LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", (Throwable)e);
                        shouldFileBeAddedIntoDC = false;
                        break;
                    }
                }
                if (!shouldFileBeAddedIntoDC) continue;
                LOG.info(String.format("Adding %s to classpath", destJarFile));
                DistributedCache.addFileToClassPath((Path)destJarFile, (Configuration)conf, (FileSystem)this.fs);
            }
        }
    }

    private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
        Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir;
        return new Path(this.fs.makeQualified(baseDir), status.getPath().getName());
    }

    private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) throws IOException {
        DistributedCache.createSymlink((Configuration)conf);
        for (String jobFile : SPLITTER.split((CharSequence)jobFileList)) {
            Path srcJobFile = new Path(jobFile);
            Path destJobFile = new Path(this.fs.makeQualified(jobFileDir), srcJobFile.getName());
            this.fs.copyFromLocalFile(srcJobFile, destJobFile);
            URI destFileUri = URI.create(destJobFile.toUri().getPath() + "#" + destJobFile.getName());
            LOG.info(String.format("Adding %s to DistributedCache", destFileUri));
            DistributedCache.addCacheFile((URI)destFileUri, (Configuration)conf);
        }
    }

    private void addHDFSFiles(String jobFileList, Configuration conf) {
        DistributedCache.createSymlink((Configuration)conf);
        jobFileList = PasswordManager.getInstance((Properties)this.jobProps).readPassword(jobFileList);
        for (String jobFile : SPLITTER.split((CharSequence)jobFileList)) {
            Path srcJobFile = new Path(jobFile);
            URI srcFileUri = URI.create(srcJobFile.toUri().getPath() + "#" + srcJobFile.getName());
            LOG.info(String.format("Adding %s to DistributedCache", srcFileUri));
            DistributedCache.addCacheFile((URI)srcFileUri, (Configuration)conf);
        }
    }

    private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException {
        for (String jarFile : SPLITTER.split((CharSequence)hdfsJarFileList)) {
            FileStatus[] status;
            for (FileStatus fileStatus : status = this.fs.listStatus(new Path(jarFile))) {
                if (fileStatus.isDirectory()) continue;
                Path path = new Path(jarFile, fileStatus.getPath().getName());
                LOG.info(String.format("Adding %s to classpath", path));
                DistributedCache.addFileToClassPath((Path)path, (Configuration)conf, (FileSystem)this.fs);
            }
        }
    }

    private void prepareJobInput(List<WorkUnit> workUnits) throws IOException {
        try (Closer closer = Closer.create();){
            ParallelRunner parallelRunner = (ParallelRunner)closer.register((Closeable)new ParallelRunner(this.parallelRunnerThreads, this.fs));
            int multiTaskIdSequence = 0;
            for (WorkUnit workUnit : workUnits) {
                String workUnitFileName = workUnit instanceof MultiWorkUnit ? JobLauncherUtils.newMultiTaskId((String)this.jobContext.getJobId(), (int)multiTaskIdSequence++) + ".mwu" : workUnit.getProp("task.id") + ".wu";
                Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
                LOG.debug("Writing work unit file " + workUnitFileName);
                parallelRunner.serializeToFile((State)workUnit, workUnitFile);
            }
        }
    }

    private void cleanUpWorkingDirectory() {
        try {
            if (this.fs.exists(this.mrJobDir)) {
                this.fs.delete(this.mrJobDir, true);
                LOG.info("Deleted working directory " + this.mrJobDir);
            }
        }
        catch (IOException ioe) {
            LOG.error("Failed to delete working directory " + this.mrJobDir);
        }
    }

    @VisibleForTesting
    void countersToMetrics(GobblinMetrics metrics) throws IOException {
        Optional counters = Optional.fromNullable((Object)this.job.getCounters());
        if (counters.isPresent()) {
            CounterGroup jobCounterGroup = (CounterGroup)((Counters)counters.get()).getGroup(MetricGroup.JOB.name());
            for (Counter jobCounter : jobCounterGroup) {
                metrics.getCounter(jobCounter.getName(), new String[0]).inc(jobCounter.getValue());
            }
            CounterGroup taskCounterGroup = (CounterGroup)((Counters)counters.get()).getGroup(MetricGroup.TASK.name());
            for (Counter taskCounter : taskCounterGroup) {
                metrics.getCounter(taskCounter.getName(), new String[0]).inc(taskCounter.getValue());
            }
        }
    }

    private static FileSystem buildFileSystem(Properties jobProps, Configuration configuration) throws IOException {
        URI fsUri = URI.create(jobProps.getProperty("fs.uri", "file:///"));
        return FileSystem.newInstance((URI)fsUri, (Configuration)configuration);
    }

    public static class TaskRunner
    extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
        private FileSystem fs;
        private StateStore<TaskState> taskStateStore;
        private TaskExecutor taskExecutor;
        private TaskStateTracker taskStateTracker;
        private ServiceManager serviceManager;
        private Optional<JobMetrics> jobMetrics = Optional.absent();
        private boolean isSpeculativeEnabled;
        private boolean customizedProgressEnabled;
        private final JobState jobState = new JobState();
        private CustomizedProgresser customizedProgresser;
        private static final String CUSTOMIZED_PROGRESSER_FACTORY_CLASS = "customizedProgresser.factoryClass";
        private static final String DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS = "org.apache.gobblin.runtime.mapreduce.CustomizedProgresserBase$BaseFactory";
        private final List<WorkUnit> workUnits = Lists.newArrayList();

        protected void setup(Mapper.Context context) {
            block27: {
                State gobblinJobState = HadoopUtils.getStateFromConf((Configuration)context.getConfiguration());
                TaskAttemptID taskAttemptID = context.getTaskAttemptID();
                try (Closer closer = Closer.create();){
                    this.customizedProgressEnabled = MRJobLauncher.isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
                    this.isSpeculativeEnabled = MRJobLauncher.isSpeculativeExecutionEnabled(gobblinJobState.getProperties());
                    String factoryClassName = gobblinJobState.getProperties().getProperty(CUSTOMIZED_PROGRESSER_FACTORY_CLASS, DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
                    this.customizedProgresser = Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class).newInstance().createCustomizedProgresser(context);
                    this.fs = FileSystem.get((Configuration)context.getConfiguration());
                    this.taskStateStore = new FsStateStore(this.fs, FileOutputFormat.getOutputPath((JobContext)context).toUri().getPath(), TaskState.class);
                    String jobStateFileName = context.getConfiguration().get("job.state.distributed.cache.name");
                    boolean foundStateFile = false;
                    for (Path dcPath : DistributedCache.getLocalCacheFiles((Configuration)context.getConfiguration())) {
                        if (!dcPath.getName().equals(jobStateFileName)) continue;
                        SerializationUtils.deserializeStateFromInputStream((InputStream)((InputStream)closer.register((Closeable)new FileInputStream(dcPath.toUri().getPath()))), (State)this.jobState);
                        foundStateFile = true;
                        break;
                    }
                    if (!foundStateFile) {
                        throw new IOException("Job state file not found.");
                    }
                }
                catch (IOException | ReflectiveOperationException e) {
                    throw new RuntimeException("Failed to setup the mapper task", e);
                }
                Configuration configuration = context.getConfiguration();
                Config jobStateAsConfig = ConfigUtils.propertiesToConfig((Properties)this.jobState.getProperties());
                DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator(jobStateAsConfig);
                Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(jobStateAsConfig);
                for (Map.Entry entry : dynamicConfig.entrySet()) {
                    this.jobState.setProp((String)entry.getKey(), ((ConfigValue)entry.getValue()).unwrapped().toString());
                    configuration.set((String)entry.getKey(), ((ConfigValue)entry.getValue()).unwrapped().toString());
                    gobblinJobState.setProp((String)entry.getKey(), (Object)((ConfigValue)entry.getValue()).unwrapped().toString());
                }
                String[] tokens = taskAttemptID.toString().split("_");
                TaskType taskType = taskAttemptID.getTaskType();
                gobblinJobState.setProp(MRJobLauncher.MR_TYPE_KEY, (Object)taskType.name());
                if (tokens.length == 6) {
                    if (taskType.equals((Object)TaskType.MAP)) {
                        gobblinJobState.setProp(MRJobLauncher.MAPPER_TASK_NUM_KEY, (Object)tokens[tokens.length - 2]);
                        gobblinJobState.setProp(MRJobLauncher.MAPPER_TASK_ATTEMPT_NUM_KEY, (Object)tokens[tokens.length - 1]);
                    } else if (taskType.equals((Object)TaskType.REDUCE)) {
                        gobblinJobState.setProp(MRJobLauncher.REDUCER_TASK_NUM_KEY, (Object)tokens[tokens.length - 2]);
                        gobblinJobState.setProp(MRJobLauncher.REDUCER_TASK_ATTEMPT_NUM_KEY, (Object)tokens[tokens.length - 1]);
                    }
                }
                this.taskExecutor = new TaskExecutor(configuration);
                this.taskStateTracker = new MRTaskStateTracker(context);
                this.serviceManager = new ServiceManager((Iterable)Lists.newArrayList((Object[])new Service[]{this.taskExecutor, this.taskStateTracker}));
                try {
                    this.serviceManager.startAsync().awaitHealthy(5L, TimeUnit.SECONDS);
                }
                catch (TimeoutException te) {
                    LOG.error("Timed out while waiting for the service manager to start up", (Throwable)te);
                    throw new RuntimeException(te);
                }
                if (Boolean.parseBoolean(configuration.get("metrics.enabled", ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
                    this.jobMetrics = Optional.of((Object)((Object)JobMetrics.get(this.jobState)));
                    try {
                        ((JobMetrics)((Object)this.jobMetrics.get())).startMetricReportingWithFileSuffix(gobblinJobState, taskAttemptID.toString());
                    }
                    catch (MultiReporterException ex) {
                        boolean isMetricReportingFailureFatal = configuration.getBoolean("gobblin.task.isMetricReportingFailureFatal", false);
                        boolean isEventReportingFailureFatal = configuration.getBoolean("gobblin.task.isEventReportingFailureFatal", false);
                        if (!MetricReportUtils.shouldThrowException((Logger)LOG, (MultiReporterException)ex, (boolean)isMetricReportingFailureFatal, (boolean)isEventReportingFailureFatal)) break block27;
                        throw new RuntimeException(ex);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(Mapper.Context context) throws IOException, InterruptedException {
            GobblinMultiTaskAttempt gobblinMultiTaskAttempt;
            block9: {
                this.setup(context);
                Path interruptPath = new Path(context.getConfiguration().get(MRJobLauncher.GOBBLIN_JOB_INTERRUPT_PATH_KEY));
                if (this.fs.exists(interruptPath)) {
                    LOG.info(String.format("Found interrupt path %s indicating the driver has interrupted the job, aborting mapper.", interruptPath));
                    return;
                }
                gobblinMultiTaskAttempt = null;
                try {
                    while (context.nextKeyValue()) {
                        this.map((LongWritable)context.getCurrentKey(), (Text)context.getCurrentValue(), context);
                    }
                    if (this.customizedProgressEnabled) {
                        this.setProgressInMapper(this.customizedProgresser.getCustomizedProgress(), context);
                    }
                    GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = this.isSpeculativeEnabled ? GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
                    SharedResourcesBrokerImpl globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker((Config)ConfigFactory.parseProperties((Properties)this.jobState.getProperties()), (ScopeInstance)GobblinScopeTypes.GLOBAL.defaultScopeInstance());
                    SharedResourcesBroker jobBroker = globalBroker.newSubscopedBuilder((ScopeInstance)new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();
                    gobblinMultiTaskAttempt = GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(), this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore, multiTaskAttemptCommitPolicy, (SharedResourcesBroker<GobblinScopeTypes>)jobBroker, gmta -> {
                        try {
                            return this.fs.exists(interruptPath);
                        }
                        catch (IOException ioe) {
                            return false;
                        }
                    });
                    if (!this.isSpeculativeEnabled) break block9;
                    LOG.info("will not commit in task attempt");
                    GobblinOutputCommitter gobblinOutputCommitter = (GobblinOutputCommitter)context.getOutputCommitter();
                    gobblinOutputCommitter.getAttemptIdToMultiTaskAttempt().put(context.getTaskAttemptID().toString(), gobblinMultiTaskAttempt);
                }
                catch (Throwable throwable) {
                    CommitStep cleanUpCommitStep = new CommitStep(this){
                        final /* synthetic */ TaskRunner this$0;
                        {
                            this.this$0 = this$0;
                        }

                        public boolean isCompleted() throws IOException {
                            return !this.this$0.serviceManager.isHealthy();
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void execute() throws IOException {
                            block19: {
                                LOG.info("Starting the clean-up steps.");
                                try {
                                    this.this$0.serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
                                }
                                catch (TimeoutException throwable) {
                                    if (!this.this$0.jobMetrics.isPresent()) break block19;
                                    try {
                                        ((JobMetrics)((Object)this.this$0.jobMetrics.get())).stopMetricsReporting();
                                    }
                                    catch (Throwable throwable2) {
                                        LOG.error("Failed to stop job metrics reporting.", throwable2);
                                    }
                                    finally {
                                        GobblinMetrics.remove((String)((JobMetrics)((Object)this.this$0.jobMetrics.get())).getName());
                                    }
                                }
                                finally {
                                    if (this.this$0.jobMetrics.isPresent()) {
                                        try {
                                            ((JobMetrics)((Object)this.this$0.jobMetrics.get())).stopMetricsReporting();
                                        }
                                        catch (Throwable throwable) {
                                            LOG.error("Failed to stop job metrics reporting.", throwable);
                                        }
                                        finally {
                                            GobblinMetrics.remove((String)((JobMetrics)((Object)this.this$0.jobMetrics.get())).getName());
                                        }
                                    }
                                }
                            }
                        }
                    };
                    if (!this.isSpeculativeEnabled || gobblinMultiTaskAttempt == null) {
                        cleanUpCommitStep.execute();
                    } else {
                        LOG.info("Adding additional commit step");
                        gobblinMultiTaskAttempt.addCleanupCommitStep(cleanUpCommitStep);
                    }
                    throw throwable;
                }
            }
            CommitStep cleanUpCommitStep = new /* invalid duplicate definition of identical inner class */;
            if (!this.isSpeculativeEnabled || gobblinMultiTaskAttempt == null) {
                cleanUpCommitStep.execute();
            } else {
                LOG.info("Adding additional commit step");
                gobblinMultiTaskAttempt.addCleanupCommitStep(cleanUpCommitStep);
            }
        }

        public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            MultiWorkUnit workUnit = value.toString().endsWith(".mwu") ? MultiWorkUnit.createEmpty() : WorkUnit.createEmpty();
            SerializationUtils.deserializeState((FileSystem)this.fs, (Path)new Path(value.toString()), (State)workUnit);
            if (workUnit instanceof MultiWorkUnit) {
                List flattenedWorkUnits = JobLauncherUtils.flattenWorkUnits((Collection)workUnit.getWorkUnits());
                this.workUnits.addAll(flattenedWorkUnits);
            } else {
                this.workUnits.add((WorkUnit)workUnit);
            }
        }

        void setProgressInMapper(float progress, Mapper.Context context) {
            try {
                WrappedMapper.Context wrappedContext = (WrappedMapper.Context)context;
                Object contextImpl = RestrictedFieldAccessingUtils.getRestrictedFieldByReflection((Object)wrappedContext, (String)"mapContext", wrappedContext.getClass());
                ((Task.TaskReporter)RestrictedFieldAccessingUtils.getRestrictedFieldByReflectionRecursively((Object)contextImpl, (String)"reporter", MapContextImpl.class)).setProgress(progress);
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

