/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.github.rholder.retry.RetryException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.Authenticator;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitSequence;
import org.apache.gobblin.commit.CommitSequenceStore;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobExecutionEventSubmitter;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.MultiEventMetadataGenerator;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.runtime.listeners.CloseableJobListener;
import org.apache.gobblin.runtime.listeners.JobExecutionEventSubmitterListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.listeners.JobListeners;
import org.apache.gobblin.runtime.locks.JobLock;
import org.apache.gobblin.runtime.locks.JobLockEventListener;
import org.apache.gobblin.runtime.locks.JobLockException;
import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
import org.apache.gobblin.runtime.metrics.GobblinJobMetricReporter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobLauncher
implements JobLauncher {
    static final Logger LOG = LoggerFactory.getLogger(AbstractJobLauncher.class);
    public static final String TASK_STATE_STORE_TABLE_SUFFIX = ".tst";
    public static final String JOB_STATE_FILE_NAME = "job.state";
    public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
    public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
    public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
    public static final String NUM_WORKUNITS = "numWorkUnits";
    public static final String GOBBLIN_JOB_MULTI_TEMPLATE_KEY = "gobblin.template.uris";
    protected final Properties jobProps;
    protected final JobContext jobContext;
    protected Optional<JobLock> jobLockOptional = Optional.absent();
    protected final Object cancellationRequest = new Object();
    protected volatile boolean cancellationRequested = false;
    protected final Object cancellationExecution = new Object();
    protected volatile boolean cancellationExecuted = false;
    protected final ExecutorService cancellationExecutor;
    protected final Optional<MetricContext> runtimeMetricContext;
    protected final EventSubmitter eventSubmitter;
    protected final EventBus eventBus = new EventBus(AbstractJobLauncher.class.getSimpleName());
    private final List<JobListener> mandatoryJobListeners = Lists.newArrayList();
    private final MultiEventMetadataGenerator multiEventMetadataGenerator;
    private final AutomaticTroubleshooter troubleshooter;
    protected final GobblinJobMetricReporter gobblinJobMetricsReporter;

    public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataTags) throws Exception {
        this(jobProps, metadataTags, null);
    }

    public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataTags, @Nullable SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
        Preconditions.checkArgument((boolean)jobProps.containsKey("job.name"), (Object)"A job must have a job name specified by job.name");
        ArrayList clusterNameTags = Lists.newArrayList();
        clusterNameTags.addAll(Tag.fromMap((Map)ClusterNameTags.getClusterNameTags()));
        GobblinMetrics.addCustomTagsToProperties((Properties)jobProps, (List)clusterNameTags);
        this.troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig((Properties)jobProps));
        this.troubleshooter.start();
        this.jobProps = new Properties();
        this.jobProps.putAll((Map<?, ?>)jobProps);
        AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary(this.jobProps);
        if (!this.tryLockJob(this.jobProps)) {
            throw new JobException(String.format("Previous instance of job %s is still running, skipping this scheduled run", this.jobProps.getProperty("job.name")));
        }
        try {
            AbstractJobLauncher.setDefaultAuthenticator(this.jobProps);
            if (instanceBroker == null) {
                instanceBroker = AbstractJobLauncher.createDefaultInstanceBroker(jobProps);
            }
            this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker, this.troubleshooter.getIssueRepository());
            this.eventBus.register((Object)this.jobContext);
            this.cancellationExecutor = Executors.newSingleThreadExecutor(ExecutorsUtils.newDaemonThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)"CancellationExecutor")));
            this.runtimeMetricContext = this.jobContext.getJobMetricsOptional().transform((Function)new Function<JobMetrics, MetricContext>(){

                public MetricContext apply(JobMetrics input) {
                    return input.getMetricContext();
                }
            });
            this.eventSubmitter = this.buildEventSubmitter(metadataTags);
            GobblinMetrics.addCustomTagToState((State)this.jobContext.getJobState(), metadataTags);
            JobExecutionEventSubmitter jobExecutionEventSubmitter = new JobExecutionEventSubmitter(this.eventSubmitter);
            this.mandatoryJobListeners.add(new JobExecutionEventSubmitterListener(jobExecutionEventSubmitter));
            this.multiEventMetadataGenerator = new MultiEventMetadataGenerator(PropertiesUtils.getPropAsList((Properties)jobProps, (String)"event.metadata.generator.class", (String)"noop"));
            String jobMetricsReporterClassName = this.jobProps.getProperty("gobblin.job.metrics.reporter.class", "org.apache.gobblin.runtime.metrics.DefaultGobblinJobMetricReporter");
            this.gobblinJobMetricsReporter = (GobblinJobMetricReporter)GobblinConstructorUtils.invokeLongestConstructor(Class.forName(jobMetricsReporterClassName), (Object[])new Object[]{this.runtimeMetricContext});
        }
        catch (Exception e) {
            this.unlockJob();
            throw e;
        }
    }

    public static void setDefaultAuthenticator(Properties properties) {
        String authenticatorClass = properties.getProperty("job.default.authenticator.class");
        if (!Strings.isNullOrEmpty((String)authenticatorClass)) {
            Authenticator authenticator = (Authenticator)GobblinConstructorUtils.invokeConstructor(Authenticator.class, (String)authenticatorClass, (Object[])new Object[]{properties});
            Authenticator.setDefault(authenticator);
        }
    }

    @Subscribe
    public void handleWorkUnitChangeEvent(WorkUnitChangeEvent workUnitChangeEvent) throws InvocationTargetException {
        LOG.info("start to handle workunit change event");
        try {
            this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
            this.addTasksToCurrentJob(workUnitChangeEvent.getNewWorkUnits());
        }
        catch (Exception e) {
            throw new InvocationTargetException(e);
        }
    }

    protected void removeTasksFromCurrentJob(List<String> taskIdsToRemove) throws IOException, ExecutionException, RetryException {
    }

    protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) throws IOException, ExecutionException, RetryException {
    }

    public static void resolveGobblinJobTemplateIfNecessary(Properties jobProps) throws IOException, URISyntaxException, SpecNotFoundException, JobTemplate.TemplateException {
        Config config = ConfigUtils.propertiesToConfig((Properties)jobProps);
        JobSpecResolver resolver = JobSpecResolver.builder(config).build();
        JobSpec jobSpec = null;
        if (jobProps.containsKey(GOBBLIN_JOB_TEMPLATE_KEY)) {
            URI templateUri = new URI(jobProps.getProperty(GOBBLIN_JOB_TEMPLATE_KEY));
            jobSpec = JobSpec.builder().withConfig(config).withTemplate(templateUri).build();
        } else if (jobProps.containsKey(GOBBLIN_JOB_MULTI_TEMPLATE_KEY)) {
            ArrayList<URI> templatesURIs = new ArrayList<URI>();
            for (String uri : jobProps.getProperty(GOBBLIN_JOB_MULTI_TEMPLATE_KEY).split(",")) {
                templatesURIs.add(new URI(uri));
            }
            jobSpec = JobSpec.builder().withConfig(config).withResourceTemplates(templatesURIs).build();
        }
        if (jobSpec != null) {
            jobProps.putAll((Map<?, ?>)ConfigUtils.configToProperties((Config)resolver.resolveJobSpec(jobSpec).getConfig()));
        }
    }

    private static SharedResourcesBroker<GobblinScopeTypes> createDefaultInstanceBroker(Properties jobProps) {
        LOG.warn("Creating a job specific {}. Objects will only be shared at the job level.", (Object)SharedResourcesBroker.class.getSimpleName());
        return SharedResourcesBrokerFactory.createDefaultTopLevelBroker((Config)ConfigFactory.parseProperties((Properties)jobProps), (ScopeInstance)GobblinScopeTypes.GLOBAL.defaultScopeInstance());
    }

    JobContext getJobContext() {
        return this.jobContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelJob(JobListener jobListener) throws JobException {
        Object object = this.cancellationRequest;
        synchronized (object) {
            if (this.cancellationRequested) {
                return;
            }
            this.cancellationRequested = true;
            this.cancellationRequest.notify();
        }
        object = this.cancellationExecution;
        synchronized (object) {
            try {
                while (!this.cancellationExecuted) {
                    this.cancellationExecution.wait();
                }
                try {
                    LOG.info("Current job state is: " + this.jobContext.getJobState().getState());
                    if (this.jobContext.getJobState().getState() != JobState.RunningState.COMMITTED && (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS)) {
                        this.jobContext.finalizeJobStateBeforeCommit();
                        this.jobContext.commit(true);
                    }
                    this.jobContext.close();
                }
                catch (IOException ioe) {
                    LOG.error("Could not close job context.", (Throwable)ioe);
                }
                this.notifyListeners(this.jobContext, jobListener, "JobCancelTimer", new JobListenerAction(){

                    @Override
                    public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                        jobListener.onJobCancellation(jobContext);
                    }
                });
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public void launchJob(JobListener jobListener) throws JobException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @VisibleForTesting
    public static long sumWorkUnitsSizes(WorkUnitStream workUnitStream) {
        List workUnits = JobLauncherUtils.flattenWorkUnits((Collection)workUnitStream.getMaterializedWorkUnitCollection());
        long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong("gobblin.service.work.unit.size", 0L)).sum();
        return totalSizeInBytes;
    }

    private void executeUnfinishedCommitSequences(String jobName) throws IOException {
        Preconditions.checkState((boolean)this.jobContext.getCommitSequenceStore().isPresent());
        CommitSequenceStore commitSequenceStore = (CommitSequenceStore)this.jobContext.getCommitSequenceStore().get();
        for (String datasetUrn : commitSequenceStore.get(jobName)) {
            Optional commitSequence = commitSequenceStore.get(jobName, datasetUrn);
            if (commitSequence.isPresent()) {
                ((CommitSequence)commitSequence.get()).execute();
            }
            commitSequenceStore.delete(jobName, datasetUrn);
        }
    }

    @Deprecated
    protected void postProcessTaskStates(List<TaskState> taskStates) {
    }

    protected void postProcessJobState(JobState jobState) {
        this.postProcessTaskStates(jobState.getTaskStates());
        if (!GobblinMetrics.isEnabled((Properties)this.jobProps)) {
            return;
        }
        ArrayList<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<DatasetTaskSummary>();
        Map<String, JobState.DatasetState> datasetStates = this.jobContext.getDatasetStatesByUrns();
        boolean processFailedTasks = PropertiesUtils.getPropAsBoolean((Properties)this.jobProps, (String)"writer.jobTaskSummary.countMetricsFromFailedTasks", (String)"false");
        for (JobState.DatasetState datasetState : datasetStates.values()) {
            if (datasetState.getState() == JobState.RunningState.COMMITTED || datasetState.getState() == JobState.RunningState.FAILED && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS) {
                long totalBytesWritten = 0L;
                long totalRecordsWritten = 0L;
                for (TaskState taskState : datasetState.getTaskStates()) {
                    if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED && !processFailedTasks) continue;
                    totalBytesWritten += taskState.getPropAsLong("writer.bytes.written", 0L);
                    totalRecordsWritten += taskState.getPropAsLong("writer.records.written", 0L);
                }
                LOG.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: %d)", datasetState.getDatasetUrn(), totalRecordsWritten, totalBytesWritten));
                datasetTaskSummaries.add(new DatasetTaskSummary(datasetState.getDatasetUrn(), totalRecordsWritten, totalBytesWritten, datasetState.getState() == JobState.RunningState.COMMITTED));
                continue;
            }
            if (datasetState.getState() != JobState.RunningState.FAILED || this.jobContext.getJobCommitPolicy() != JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) continue;
            LOG.info("Due to task failure, will report that no records or bytes were written for " + datasetState.getDatasetUrn());
            datasetTaskSummaries.add(new DatasetTaskSummary(datasetState.getDatasetUrn(), 0L, 0L, false));
        }
        TimingEvent jobSummaryTimer = this.eventSubmitter.getTimingEvent("JobSummaryTimer");
        jobSummaryTimer.addMetadata("datasetTaskSummaries", GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
        jobSummaryTimer.stop();
    }

    @Override
    public void close() throws IOException {
        this.troubleshooter.stop();
        try {
            this.cancellationExecutor.shutdownNow();
            try {
                this.jobContext.getSource().shutdown((SourceState)this.jobContext.getJobState());
            }
            finally {
                if (GobblinMetrics.isEnabled((Properties)this.jobProps)) {
                    GobblinMetricsRegistry.getInstance().remove(this.jobContext.getJobId());
                }
            }
        }
        finally {
            this.unlockJob();
        }
    }

    protected abstract void runWorkUnits(List<WorkUnit> var1) throws Exception;

    protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws Exception {
        this.runWorkUnits(this.materializeWorkUnitList(workUnitStream));
    }

    private List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
        if (!workUnitStream.isFiniteStream()) {
            throw new UnsupportedOperationException("Cannot materialize an infinite work unit stream.");
        }
        return Lists.newArrayList((Iterator)workUnitStream.getWorkUnits());
    }

    protected JobLock getJobLock(Properties properties, JobLockEventListener jobLockEventListener) throws JobLockException {
        return LegacyJobLockFactoryManager.getJobLock(properties, jobLockEventListener);
    }

    protected abstract void executeCancellation();

    protected void startCancellationExecutor() {
        this.cancellationExecutor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = AbstractJobLauncher.this.cancellationRequest;
                synchronized (object) {
                    try {
                        while (!AbstractJobLauncher.this.cancellationRequested) {
                            AbstractJobLauncher.this.cancellationRequest.wait();
                        }
                        LOG.info("Cancellation has been requested for job " + AbstractJobLauncher.this.jobContext.getJobId());
                        AbstractJobLauncher.this.executeCancellation();
                        LOG.info("Cancellation has been executed for job " + AbstractJobLauncher.this.jobContext.getJobId());
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                object = AbstractJobLauncher.this.cancellationExecution;
                synchronized (object) {
                    AbstractJobLauncher.this.cancellationExecuted = true;
                    AbstractJobLauncher.this.jobContext.getJobState().setState(JobState.RunningState.CANCELLED);
                    AbstractJobLauncher.this.cancellationExecution.notifyAll();
                }
            }
        });
    }

    private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState jobState) {
        return workUnits.transform((Function)new WorkUnitPreparator(this.jobContext.getJobId()));
    }

    private boolean tryLockJob(Properties properties) {
        try {
            if (Boolean.valueOf(properties.getProperty("job.lock.enabled", Boolean.TRUE.toString())).booleanValue()) {
                this.jobLockOptional = Optional.of((Object)this.getJobLock(properties, new JobLockEventListener(){

                    @Override
                    public void onLost() {
                        AbstractJobLauncher.this.executeCancellation();
                    }
                }));
            }
            return !this.jobLockOptional.isPresent() || ((JobLock)this.jobLockOptional.get()).tryLock();
        }
        catch (JobLockException ioe) {
            LOG.error(String.format("Failed to acquire job lock for job %s: %s", this.jobContext.getJobId(), ioe), (Throwable)ioe);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void unlockJob() {
        if (this.jobLockOptional.isPresent()) {
            ((JobLock)this.jobLockOptional.get()).unlock();
            try {
                ((JobLock)this.jobLockOptional.get()).close();
            }
            catch (IOException e) {
                LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
            }
            finally {
                this.jobLockOptional = Optional.absent();
            }
            catch (JobLockException ioe) {
                try {
                    LOG.error(String.format("Failed to unlock for job %s: %s", this.jobContext.getJobId(), ioe), (Throwable)ioe);
                }
                catch (Throwable throwable) {
                    try {
                        ((JobLock)this.jobLockOptional.get()).close();
                    }
                    catch (IOException e) {
                        LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
                    }
                    finally {
                        this.jobLockOptional = Optional.absent();
                    }
                    throw throwable;
                }
                try {
                    ((JobLock)this.jobLockOptional.get()).close();
                }
                catch (IOException e) {
                    LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
                }
                finally {
                    this.jobLockOptional = Optional.absent();
                }
            }
        }
    }

    private CloseableJobListener getParallelCombinedJobListener(JobState jobState, JobListener jobListener) {
        ArrayList jobListeners = Lists.newArrayList(this.mandatoryJobListeners);
        jobListeners.add(jobListener);
        Set jobListenerClassNames = jobState.getPropAsSet("job.listeners", "");
        for (String jobListenerClassName : jobListenerClassNames) {
            try {
                Class<?> jobListenerClass = Class.forName(jobListenerClassName);
                jobListeners.add(jobListenerClass.newInstance());
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                LOG.warn(String.format("JobListener could not be created due to %s", jobListenerClassName), (Throwable)e);
            }
        }
        return JobListeners.parallelJobListener(jobListeners);
    }

    private static List<Tag<?>> addClusterNameTags(List<? extends Tag<?>> tags) {
        return ImmutableList.builder().addAll(tags).addAll((Iterable)Tag.fromMap((Map)ClusterNameTags.getClusterNameTags())).build();
    }

    private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
        return new EventSubmitter.Builder(this.runtimeMetricContext, "gobblin.runtime").addMetadata(Tag.toMap((List)Tag.tagValuesToString(tags))).build();
    }

    private void cleanLeftoverStagingData(WorkUnitStream workUnits, JobState jobState) throws JobException {
        block15: {
            if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
                return;
            }
            try {
                if (!this.canCleanStagingData(jobState)) {
                    LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
                    return;
                }
            }
            catch (IOException e) {
                throw new JobException("Failed to check unfinished commit sequences", e);
            }
            try {
                if (this.jobContext.shouldCleanupStagingDataPerTask()) {
                    if (workUnits.isSafeToMaterialize()) {
                        HashMap parallelRunners = Maps.newHashMap();
                        try (Closer closer = Closer.create();){
                            for (WorkUnit workUnit : JobLauncherUtils.flattenWorkUnits((Collection)workUnits.getMaterializedWorkUnitCollection())) {
                                JobLauncherUtils.cleanTaskStagingData((State)new WorkUnitState(workUnit, (State)jobState), (Logger)LOG, (Closer)closer, (Map)parallelRunners);
                            }
                            break block15;
                        }
                    }
                    throw new RuntimeException("Work unit streams do not support cleaning staging data per task.");
                }
                if (jobState.getPropAsBoolean("cleanup.old.job.data", false)) {
                    JobLauncherUtils.cleanUpOldJobData((State)jobState, (Logger)LOG, (boolean)this.jobContext.getStagingDirProvided(), (boolean)this.jobContext.getOutputDirProvided());
                }
                JobLauncherUtils.cleanJobStagingData((State)jobState, (Logger)LOG);
            }
            catch (Throwable t) {
                LOG.error("Failed to clean leftover staging data", t);
            }
        }
    }

    private static String getJobIdPrefix(String jobId) {
        return jobId.substring(0, jobId.lastIndexOf("_") + 1);
    }

    private void cleanupStagingData(JobState jobState) throws JobException {
        if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
            return;
        }
        try {
            if (!this.canCleanStagingData(jobState)) {
                LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
                return;
            }
        }
        catch (IOException e) {
            throw new JobException("Failed to check unfinished commit sequences", e);
        }
        if (this.jobContext.shouldCleanupStagingDataPerTask()) {
            AbstractJobLauncher.cleanupStagingDataPerTask(jobState);
        } else {
            AbstractJobLauncher.cleanupStagingDataForEntireJob(jobState);
        }
    }

    @Override
    public boolean isEarlyStopped() {
        return this.jobContext.getSource().isEarlyStopped();
    }

    protected IssueRepository getIssueRepository() {
        return this.troubleshooter.getIssueRepository();
    }

    private boolean canCleanStagingData(JobState jobState) throws IOException {
        return this.jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || !((CommitSequenceStore)this.jobContext.getCommitSequenceStore().get()).exists(jobState.getJobName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void cleanupStagingDataPerTask(JobState jobState) {
        Closer closer = Closer.create();
        HashMap parallelRunners = Maps.newHashMap();
        try {
            for (TaskState taskState : jobState.getTaskStates()) {
                try {
                    JobLauncherUtils.cleanTaskStagingData((State)taskState, (Logger)LOG, (Closer)closer, (Map)parallelRunners);
                }
                catch (IOException e) {
                    LOG.error(String.format("Failed to clean staging data for task %s: %s", taskState.getTaskId(), e), (Throwable)e);
                }
            }
        }
        finally {
            try {
                closer.close();
            }
            catch (IOException e) {
                LOG.error("Failed to clean staging data", (Throwable)e);
            }
        }
    }

    private static void cleanupStagingDataForEntireJob(JobState jobState) {
        try {
            JobLauncherUtils.cleanJobStagingData((State)jobState, (Logger)LOG);
        }
        catch (IOException e) {
            LOG.error("Failed to clean staging data for job " + jobState.getJobId(), (Throwable)e);
        }
    }

    private void notifyListeners(JobContext jobContext, JobListener jobListener, String timerEventName, JobListenerAction action) throws JobException {
        TimingEvent timer = this.eventSubmitter.getTimingEvent(timerEventName);
        try (CloseableJobListener parallelJobListener = this.getParallelCombinedJobListener(this.jobContext.getJobState(), jobListener);){
            action.apply(parallelJobListener, jobContext);
        }
        catch (Exception e) {
            throw new JobException("Failed to execute all JobListeners", e);
        }
        finally {
            LOG.info("Submitting {}", (Object)timerEventName);
            timer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext, EventName.getEnumFromEventId((String)timerEventName)));
        }
    }

    private static interface JobListenerAction {
        public void apply(JobListener var1, JobContext var2) throws Exception;
    }

    private static class WorkUnitPreparator
    extends MultiWorkUnitForEach {
        private int taskIdSequence = 0;
        private final String jobId;

        @Override
        protected void forWorkUnit(WorkUnit workUnit) {
            workUnit.setProp("job.id", (Object)this.jobId);
            String taskId = JobLauncherUtils.newTaskId((String)this.jobId, (int)this.taskIdSequence++);
            workUnit.setId(taskId);
            workUnit.setProp("task.id", (Object)taskId);
            workUnit.setProp("task.key", (Object)Long.toString(Id.Task.parse((String)taskId).getSequence()));
        }

        public WorkUnitPreparator(String jobId) {
            this.jobId = jobId;
        }
    }

    private static abstract class MultiWorkUnitForEach
    implements Function<WorkUnit, WorkUnit> {
        private MultiWorkUnitForEach() {
        }

        @Nullable
        public WorkUnit apply(WorkUnit input) {
            if (input instanceof MultiWorkUnit) {
                for (WorkUnit wu : ((MultiWorkUnit)input).getWorkUnits()) {
                    this.forWorkUnit(wu);
                }
            } else {
                this.forWorkUnit(input);
            }
            return input;
        }

        protected abstract void forWorkUnit(WorkUnit var1);
    }

    private static class SkippedWorkUnitsFilter
    implements Predicate<WorkUnit> {
        private final JobState jobState;

        public boolean apply(WorkUnit workUnit) {
            if (workUnit instanceof MultiWorkUnit) {
                Preconditions.checkArgument((!workUnit.contains("workunit.skip") ? 1 : 0) != 0, (Object)"Error: MultiWorkUnit cannot be skipped");
                for (WorkUnit wu : ((MultiWorkUnit)workUnit).getWorkUnits()) {
                    Preconditions.checkArgument((!wu.contains("workunit.skip") ? 1 : 0) != 0, (Object)"Error: MultiWorkUnit cannot contain skipped WorkUnit");
                }
            }
            if (workUnit.getPropAsBoolean("workunit.skip", false)) {
                WorkUnitState workUnitState = new WorkUnitState(workUnit, (State)this.jobState);
                workUnitState.setWorkingState(WorkUnitState.WorkingState.SKIPPED);
                this.jobState.addSkippedTaskState(new TaskState(workUnitState));
                return false;
            }
            return true;
        }

        public SkippedWorkUnitsFilter(JobState jobState) {
            this.jobState = jobState;
        }
    }
}

