/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime;

import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.ScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitSequence;
import org.apache.gobblin.commit.CommitSequenceStore;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.initializer.ConverterInitializer;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobExecutionEventSubmitter;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.api.EventMetadataGenerator;
import org.apache.gobblin.runtime.listeners.CloseableJobListener;
import org.apache.gobblin.runtime.listeners.JobExecutionEventSubmitterListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.listeners.JobListeners;
import org.apache.gobblin.runtime.locks.JobLock;
import org.apache.gobblin.runtime.locks.JobLockEventListener;
import org.apache.gobblin.runtime.locks.JobLockException;
import org.apache.gobblin.runtime.locks.LegacyJobLockFactoryManager;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractJobLauncher
implements JobLauncher {
    static final Logger LOG = LoggerFactory.getLogger(AbstractJobLauncher.class);
    public static final String TASK_STATE_STORE_TABLE_SUFFIX = ".tst";
    public static final String JOB_STATE_FILE_NAME = "job.state";
    public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
    public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
    protected final Properties jobProps;
    protected final JobContext jobContext;
    protected Optional<JobLock> jobLockOptional = Optional.absent();
    protected final Object cancellationRequest = new Object();
    protected volatile boolean cancellationRequested = false;
    protected final Object cancellationExecution = new Object();
    protected volatile boolean cancellationExecuted = false;
    protected final ExecutorService cancellationExecutor;
    protected final Optional<MetricContext> runtimeMetricContext;
    protected final EventSubmitter eventSubmitter;
    protected final EventBus eventBus = new EventBus(AbstractJobLauncher.class.getSimpleName());
    private final List<JobListener> mandatoryJobListeners = Lists.newArrayList();
    private final EventMetadataGenerator eventMetadataGenerator;

    public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataTags) throws Exception {
        this(jobProps, metadataTags, null);
    }

    public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataTags, @Nullable SharedResourcesBroker<GobblinScopeTypes> instanceBroker) throws Exception {
        Preconditions.checkArgument((boolean)jobProps.containsKey("job.name"), (Object)"A job must have a job name specified by job.name");
        ArrayList clusterNameTags = Lists.newArrayList();
        clusterNameTags.addAll(Tag.fromMap((Map)ClusterNameTags.getClusterNameTags()));
        GobblinMetrics.addCustomTagsToProperties((Properties)jobProps, (List)clusterNameTags);
        this.jobProps = new Properties();
        this.jobProps.putAll((Map<?, ?>)jobProps);
        if (!this.tryLockJob(this.jobProps)) {
            throw new JobException(String.format("Previous instance of job %s is still running, skipping this scheduled run", this.jobProps.getProperty("job.name")));
        }
        try {
            if (instanceBroker == null) {
                instanceBroker = AbstractJobLauncher.createDefaultInstanceBroker(jobProps);
            }
            this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker);
            this.eventBus.register((Object)this.jobContext);
            this.cancellationExecutor = Executors.newSingleThreadExecutor(ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)"CancellationExecutor")));
            this.runtimeMetricContext = this.jobContext.getJobMetricsOptional().transform((Function)new Function<JobMetrics, MetricContext>(){

                public MetricContext apply(JobMetrics input) {
                    return input.getMetricContext();
                }
            });
            this.eventSubmitter = this.buildEventSubmitter(metadataTags);
            GobblinMetrics.addCustomTagToState((State)this.jobContext.getJobState(), metadataTags);
            JobExecutionEventSubmitter jobExecutionEventSubmitter = new JobExecutionEventSubmitter(this.eventSubmitter);
            this.mandatoryJobListeners.add(new JobExecutionEventSubmitterListener(jobExecutionEventSubmitter));
            String eventMetadatadataGeneratorClassName = jobProps.getProperty("event.metadata.generator.class", "noop");
            try {
                ClassAliasResolver aliasResolver = new ClassAliasResolver(EventMetadataGenerator.class);
                this.eventMetadataGenerator = (EventMetadataGenerator)aliasResolver.resolveClass(eventMetadatadataGeneratorClassName).newInstance();
            }
            catch (ReflectiveOperationException e) {
                throw new RuntimeException("Could not construct EventMetadataGenerator " + eventMetadatadataGeneratorClassName, e);
            }
        }
        catch (Exception e) {
            this.unlockJob();
            throw e;
        }
    }

    private static SharedResourcesBroker<GobblinScopeTypes> createDefaultInstanceBroker(Properties jobProps) {
        LOG.warn("Creating a job specific {}. Objects will only be shared at the job level.", (Object)SharedResourcesBroker.class.getSimpleName());
        return SharedResourcesBrokerFactory.createDefaultTopLevelBroker((Config)ConfigFactory.parseProperties((Properties)jobProps), (ScopeInstance)GobblinScopeTypes.GLOBAL.defaultScopeInstance());
    }

    JobContext getJobContext() {
        return this.jobContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancelJob(JobListener jobListener) throws JobException {
        Object object = this.cancellationRequest;
        synchronized (object) {
            if (this.cancellationRequested) {
                return;
            }
            this.cancellationRequested = true;
            this.cancellationRequest.notify();
        }
        object = this.cancellationExecution;
        synchronized (object) {
            try {
                while (!this.cancellationExecuted) {
                    this.cancellationExecution.wait();
                }
                try {
                    LOG.info("Current job state is: " + this.jobContext.getJobState().getState());
                    if (this.jobContext.getJobState().getState() != JobState.RunningState.COMMITTED && (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS)) {
                        this.jobContext.finalizeJobStateBeforeCommit();
                        this.jobContext.commit(true);
                    }
                    this.jobContext.close();
                }
                catch (IOException ioe) {
                    LOG.error("Could not close job context.", (Throwable)ioe);
                }
                this.notifyListeners(this.jobContext, jobListener, "JobCancelTimer", new JobListenerAction(){

                    @Override
                    public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                        jobListener.onJobCancellation(jobContext);
                    }
                });
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void launchJob(JobListener jobListener) throws JobException {
        String jobId = this.jobContext.getJobId();
        final JobState jobState = this.jobContext.getJobState();
        try {
            MDC.put((String)"job.name", (String)this.jobContext.getJobName());
            MDC.put((String)"job.key", (String)this.jobContext.getJobKey());
            TimingEvent launchJobTimer = this.eventSubmitter.getTimingEvent("FullJobExecutionTimer");
            try (Closer closer = Closer.create();){
                closer.register((Closeable)this.jobContext);
                this.notifyListeners(this.jobContext, jobListener, "JobPrepareTimer", new JobListenerAction(){

                    @Override
                    public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                        jobListener.onJobPrepare(jobContext);
                    }
                });
                if (this.jobContext.getSemantics() == DeliverySemantics.EXACTLY_ONCE) {
                    this.executeUnfinishedCommitSequences(jobState.getJobName());
                }
                TimingEvent workUnitsCreationTimer = this.eventSubmitter.getTimingEvent("WorkUnitsCreationTimer");
                Source<?, ?> source = this.jobContext.getSource();
                WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource ? ((WorkUnitStreamSource)source).getWorkunitStream((SourceState)jobState) : new BasicWorkUnitStream.Builder(source.getWorkunits((SourceState)jobState)).build();
                workUnitsCreationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.WORK_UNITS_CREATION));
                if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
                    this.eventSubmitter.submit("WorkUnitsMissing");
                    jobState.setState(JobState.RunningState.FAILED);
                    throw new JobException("Failed to get work units for job " + jobId);
                }
                if (!workUnitStream.getWorkUnits().hasNext()) {
                    this.eventSubmitter.submit("WorkUnitsEmpty");
                    LOG.warn("No work units have been created for job " + jobId);
                    jobState.setState(JobState.RunningState.COMMITTED);
                    this.notifyListeners(this.jobContext, jobListener, "JobCompleteTimer", new JobListenerAction(){

                        @Override
                        public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                            jobListener.onJobCompletion(jobContext);
                        }
                    });
                    return;
                }
                ((WriterInitializer)closer.register((Closeable)WriterInitializerFactory.newInstace((State)jobState, (WorkUnitStream)workUnitStream))).initialize();
                ((ConverterInitializer)closer.register((Closeable)ConverterInitializerFactory.newInstance((State)jobState, (WorkUnitStream)workUnitStream))).initialize();
                TimingEvent stagingDataCleanTimer = this.eventSubmitter.getTimingEvent("JobMrStagingDataCleanTimer");
                this.cleanLeftoverStagingData(workUnitStream, jobState);
                stagingDataCleanTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.MR_STAGING_DATA_CLEAN));
                long startTime = System.currentTimeMillis();
                jobState.setStartTime(startTime);
                jobState.setState(JobState.RunningState.RUNNING);
                try {
                    LOG.info("Starting job " + jobId);
                    this.notifyListeners(this.jobContext, jobListener, "JobStartTimer", new JobListenerAction(){

                        @Override
                        public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                            jobListener.onJobStart(jobContext);
                        }
                    });
                    TimingEvent workUnitsPreparationTimer = this.eventSubmitter.getTimingEvent("WorkUnitsPreparationTimer");
                    workUnitStream = this.prepareWorkUnits(workUnitStream, jobState);
                    workUnitStream = workUnitStream.filter((Predicate)new SkippedWorkUnitsFilter(jobState));
                    workUnitStream = workUnitStream.transform((Function)new MultiWorkUnitForEach(){

                        @Override
                        public void forWorkUnit(WorkUnit workUnit) {
                            jobState.incrementTaskCount();
                            jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, (State)jobState)));
                        }
                    });
                    if (jobState.getPropAsBoolean("workunit.enableTrackingLogs")) {
                        workUnitStream = workUnitStream.transform((Function)new Function<WorkUnit, WorkUnit>(){

                            @Nullable
                            public WorkUnit apply(@Nullable WorkUnit input) {
                                LOG.info("Work unit tracking log: {}", (Object)input);
                                return input;
                            }
                        });
                    }
                    workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.WORK_UNITS_PREPARATION));
                    this.jobContext.storeJobExecutionInfo();
                    TimingEvent jobRunTimer = this.eventSubmitter.getTimingEvent("JobRunTimer");
                    this.runWorkUnitStream(workUnitStream);
                    jobRunTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_RUN));
                    this.eventSubmitter.submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, "JOB_" + jobState.getState()));
                    if (jobState.getState() == JobState.RunningState.CANCELLED) {
                        LOG.info(String.format("Job %s has been cancelled, aborting now", jobId));
                        return;
                    }
                    TimingEvent jobCommitTimer = this.eventSubmitter.getTimingEvent("JobCommitTimer");
                    this.jobContext.finalizeJobStateBeforeCommit();
                    this.jobContext.commit();
                    this.postProcessJobState(jobState);
                    jobCommitTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_COMMIT));
                }
                finally {
                    long endTime = System.currentTimeMillis();
                    jobState.setEndTime(endTime);
                    jobState.setDuration(endTime - jobState.getStartTime());
                }
            }
            catch (Throwable t) {
                jobState.setState(JobState.RunningState.FAILED);
                String errMsg = "Failed to launch and run job " + jobId;
                LOG.error(errMsg + ": " + t, t);
            }
            finally {
                try {
                    TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                    this.cleanupStagingData(jobState);
                    jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP));
                    this.jobContext.storeJobExecutionInfo();
                }
                finally {
                    launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION));
                }
            }
            for (JobState.DatasetState datasetState : this.jobContext.getDatasetStatesByUrns().values()) {
                if (datasetState.getState() != JobState.RunningState.FAILED) continue;
                jobState.setState(JobState.RunningState.FAILED);
                LOG.warn("At least one dataset state is FAILED. Setting job state to FAILED.");
                break;
            }
            this.notifyListeners(this.jobContext, jobListener, "JobCompleteTimer", new JobListenerAction(){

                @Override
                public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                    jobListener.onJobCompletion(jobContext);
                }
            });
            if (jobState.getState() != JobState.RunningState.FAILED) return;
            this.notifyListeners(this.jobContext, jobListener, "JobFailedTimer", new JobListenerAction(){

                @Override
                public void apply(JobListener jobListener, JobContext jobContext) throws Exception {
                    jobListener.onJobFailure(jobContext);
                }
            });
            throw new JobException(String.format("Job %s failed", jobId));
        }
        finally {
            if (this.jobContext.getJobMetricsOptional().isPresent()) {
                JobMetrics.remove(jobState);
            }
            MDC.remove((String)"job.name");
            MDC.remove((String)"job.key");
        }
    }

    private void executeUnfinishedCommitSequences(String jobName) throws IOException {
        Preconditions.checkState((boolean)this.jobContext.getCommitSequenceStore().isPresent());
        CommitSequenceStore commitSequenceStore = (CommitSequenceStore)this.jobContext.getCommitSequenceStore().get();
        for (String datasetUrn : commitSequenceStore.get(jobName)) {
            Optional commitSequence = commitSequenceStore.get(jobName, datasetUrn);
            if (commitSequence.isPresent()) {
                ((CommitSequence)commitSequence.get()).execute();
            }
            commitSequenceStore.delete(jobName, datasetUrn);
        }
    }

    @Deprecated
    protected void postProcessTaskStates(List<TaskState> taskStates) {
    }

    protected void postProcessJobState(JobState jobState) {
        this.postProcessTaskStates(jobState.getTaskStates());
    }

    @Override
    public void close() throws IOException {
        try {
            this.cancellationExecutor.shutdownNow();
            try {
                this.jobContext.getSource().shutdown((SourceState)this.jobContext.getJobState());
            }
            finally {
                if (GobblinMetrics.isEnabled((Properties)this.jobProps)) {
                    GobblinMetricsRegistry.getInstance().remove(this.jobContext.getJobId());
                }
            }
        }
        finally {
            this.unlockJob();
        }
    }

    protected abstract void runWorkUnits(List<WorkUnit> var1) throws Exception;

    protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws Exception {
        this.runWorkUnits(this.materializeWorkUnitList(workUnitStream));
    }

    private List<WorkUnit> materializeWorkUnitList(WorkUnitStream workUnitStream) {
        if (!workUnitStream.isFiniteStream()) {
            throw new UnsupportedOperationException("Cannot materialize an infinite work unit stream.");
        }
        return Lists.newArrayList((Iterator)workUnitStream.getWorkUnits());
    }

    protected JobLock getJobLock(Properties properties, JobLockEventListener jobLockEventListener) throws JobLockException {
        return LegacyJobLockFactoryManager.getJobLock(properties, jobLockEventListener);
    }

    protected abstract void executeCancellation();

    protected void startCancellationExecutor() {
        this.cancellationExecutor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = AbstractJobLauncher.this.cancellationRequest;
                synchronized (object) {
                    try {
                        while (!AbstractJobLauncher.this.cancellationRequested) {
                            AbstractJobLauncher.this.cancellationRequest.wait();
                        }
                        LOG.info("Cancellation has been requested for job " + AbstractJobLauncher.this.jobContext.getJobId());
                        AbstractJobLauncher.this.executeCancellation();
                        LOG.info("Cancellation has been executed for job " + AbstractJobLauncher.this.jobContext.getJobId());
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                object = AbstractJobLauncher.this.cancellationExecution;
                synchronized (object) {
                    AbstractJobLauncher.this.cancellationExecuted = true;
                    AbstractJobLauncher.this.jobContext.getJobState().setState(JobState.RunningState.CANCELLED);
                    AbstractJobLauncher.this.cancellationExecution.notifyAll();
                }
            }
        });
    }

    private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState jobState) {
        return workUnits.transform((Function)new WorkUnitPreparator(this.jobContext.getJobId()));
    }

    private boolean tryLockJob(Properties properties) {
        try {
            if (Boolean.valueOf(properties.getProperty("job.lock.enabled", Boolean.TRUE.toString())).booleanValue()) {
                this.jobLockOptional = Optional.of((Object)this.getJobLock(properties, new JobLockEventListener(){

                    @Override
                    public void onLost() {
                        AbstractJobLauncher.this.executeCancellation();
                    }
                }));
            }
            return !this.jobLockOptional.isPresent() || ((JobLock)this.jobLockOptional.get()).tryLock();
        }
        catch (JobLockException ioe) {
            LOG.error(String.format("Failed to acquire job lock for job %s: %s", this.jobContext.getJobId(), ioe), (Throwable)ioe);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void unlockJob() {
        if (this.jobLockOptional.isPresent()) {
            ((JobLock)this.jobLockOptional.get()).unlock();
            try {
                ((JobLock)this.jobLockOptional.get()).close();
            }
            catch (IOException e) {
                LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
            }
            finally {
                this.jobLockOptional = Optional.absent();
            }
            catch (JobLockException ioe) {
                try {
                    LOG.error(String.format("Failed to unlock for job %s: %s", this.jobContext.getJobId(), ioe), (Throwable)ioe);
                }
                catch (Throwable throwable) {
                    try {
                        ((JobLock)this.jobLockOptional.get()).close();
                    }
                    catch (IOException e) {
                        LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
                    }
                    finally {
                        this.jobLockOptional = Optional.absent();
                    }
                    throw throwable;
                }
                try {
                    ((JobLock)this.jobLockOptional.get()).close();
                }
                catch (IOException e) {
                    LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), (Throwable)e);
                }
                finally {
                    this.jobLockOptional = Optional.absent();
                }
            }
        }
    }

    private CloseableJobListener getParallelCombinedJobListener(JobState jobState, JobListener jobListener) {
        ArrayList jobListeners = Lists.newArrayList(this.mandatoryJobListeners);
        jobListeners.add(jobListener);
        Set jobListenerClassNames = jobState.getPropAsSet("job.listeners", "");
        for (String jobListenerClassName : jobListenerClassNames) {
            try {
                Class<?> jobListenerClass = Class.forName(jobListenerClassName);
                jobListeners.add(jobListenerClass.newInstance());
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                LOG.warn(String.format("JobListener could not be created due to %s", jobListenerClassName), (Throwable)e);
            }
        }
        return JobListeners.parallelJobListener(jobListeners);
    }

    private static List<Tag<?>> addClusterNameTags(List<? extends Tag<?>> tags) {
        return ImmutableList.builder().addAll(tags).addAll((Iterable)Tag.fromMap((Map)ClusterNameTags.getClusterNameTags())).build();
    }

    private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
        return new EventSubmitter.Builder(this.runtimeMetricContext, "gobblin.runtime").addMetadata(Tag.toMap((List)Tag.tagValuesToString(tags))).build();
    }

    private void cleanLeftoverStagingData(WorkUnitStream workUnits, JobState jobState) throws JobException {
        block15: {
            if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
                return;
            }
            try {
                if (!this.canCleanStagingData(jobState)) {
                    LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
                    return;
                }
            }
            catch (IOException e) {
                throw new JobException("Failed to check unfinished commit sequences", e);
            }
            try {
                if (this.jobContext.shouldCleanupStagingDataPerTask()) {
                    if (workUnits.isSafeToMaterialize()) {
                        HashMap parallelRunners = Maps.newHashMap();
                        try (Closer closer = Closer.create();){
                            for (WorkUnit workUnit : JobLauncherUtils.flattenWorkUnits((Collection)workUnits.getMaterializedWorkUnitCollection())) {
                                JobLauncherUtils.cleanTaskStagingData((State)new WorkUnitState(workUnit, (State)jobState), (Logger)LOG, (Closer)closer, (Map)parallelRunners);
                            }
                            break block15;
                        }
                    }
                    throw new RuntimeException("Work unit streams do not support cleaning staging data per task.");
                }
                if (jobState.getPropAsBoolean("cleanup.old.job.data", false)) {
                    JobLauncherUtils.cleanUpOldJobData((State)jobState, (Logger)LOG, (boolean)this.jobContext.getStagingDirProvided(), (boolean)this.jobContext.getOutputDirProvided());
                }
                JobLauncherUtils.cleanJobStagingData((State)jobState, (Logger)LOG);
            }
            catch (Throwable t) {
                LOG.error("Failed to clean leftover staging data", t);
            }
        }
    }

    private static String getJobIdPrefix(String jobId) {
        return jobId.substring(0, jobId.lastIndexOf("_") + 1);
    }

    private void cleanupStagingData(JobState jobState) throws JobException {
        if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
            return;
        }
        try {
            if (!this.canCleanStagingData(jobState)) {
                LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
                return;
            }
        }
        catch (IOException e) {
            throw new JobException("Failed to check unfinished commit sequences", e);
        }
        if (this.jobContext.shouldCleanupStagingDataPerTask()) {
            AbstractJobLauncher.cleanupStagingDataPerTask(jobState);
        } else {
            AbstractJobLauncher.cleanupStagingDataForEntireJob(jobState);
        }
    }

    @Override
    public boolean isEarlyStopped() {
        return this.jobContext.getSource().isEarlyStopped();
    }

    private boolean canCleanStagingData(JobState jobState) throws IOException {
        return this.jobContext.getSemantics() != DeliverySemantics.EXACTLY_ONCE || !((CommitSequenceStore)this.jobContext.getCommitSequenceStore().get()).exists(jobState.getJobName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void cleanupStagingDataPerTask(JobState jobState) {
        Closer closer = Closer.create();
        HashMap parallelRunners = Maps.newHashMap();
        try {
            for (TaskState taskState : jobState.getTaskStates()) {
                try {
                    JobLauncherUtils.cleanTaskStagingData((State)taskState, (Logger)LOG, (Closer)closer, (Map)parallelRunners);
                }
                catch (IOException e) {
                    LOG.error(String.format("Failed to clean staging data for task %s: %s", taskState.getTaskId(), e), (Throwable)e);
                }
            }
        }
        finally {
            try {
                closer.close();
            }
            catch (IOException e) {
                LOG.error("Failed to clean staging data", (Throwable)e);
            }
        }
    }

    private static void cleanupStagingDataForEntireJob(JobState jobState) {
        try {
            JobLauncherUtils.cleanJobStagingData((State)jobState, (Logger)LOG);
        }
        catch (IOException e) {
            LOG.error("Failed to clean staging data for job " + jobState.getJobId(), (Throwable)e);
        }
    }

    private void notifyListeners(JobContext jobContext, JobListener jobListener, String timerEventName, JobListenerAction action) throws JobException {
        TimingEvent timer = this.eventSubmitter.getTimingEvent(timerEventName);
        try (CloseableJobListener parallelJobListener = this.getParallelCombinedJobListener(this.jobContext.getJobState(), jobListener);){
            action.apply(parallelJobListener, jobContext);
        }
        catch (Exception e) {
            throw new JobException("Failed to execute all JobListeners", e);
        }
        finally {
            timer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.getEnumFromEventId((String)timerEventName)));
        }
    }

    private static interface JobListenerAction {
        public void apply(JobListener var1, JobContext var2) throws Exception;
    }

    private static class WorkUnitPreparator
    extends MultiWorkUnitForEach {
        private int taskIdSequence = 0;
        private final String jobId;

        @Override
        protected void forWorkUnit(WorkUnit workUnit) {
            workUnit.setProp("job.id", (Object)this.jobId);
            String taskId = JobLauncherUtils.newTaskId((String)this.jobId, (int)this.taskIdSequence++);
            workUnit.setId(taskId);
            workUnit.setProp("task.id", (Object)taskId);
            workUnit.setProp("task.key", (Object)Long.toString(Id.Task.parse((String)taskId).getSequence()));
        }

        @ConstructorProperties(value={"jobId"})
        public WorkUnitPreparator(String jobId) {
            this.jobId = jobId;
        }
    }

    private static abstract class MultiWorkUnitForEach
    implements Function<WorkUnit, WorkUnit> {
        private MultiWorkUnitForEach() {
        }

        @Nullable
        public WorkUnit apply(WorkUnit input) {
            if (input instanceof MultiWorkUnit) {
                for (WorkUnit wu : ((MultiWorkUnit)input).getWorkUnits()) {
                    this.forWorkUnit(wu);
                }
            } else {
                this.forWorkUnit(input);
            }
            return input;
        }

        protected abstract void forWorkUnit(WorkUnit var1);
    }

    private static class SkippedWorkUnitsFilter
    implements Predicate<WorkUnit> {
        private final JobState jobState;

        public boolean apply(WorkUnit workUnit) {
            if (workUnit instanceof MultiWorkUnit) {
                Preconditions.checkArgument((!workUnit.contains("workunit.skip") ? 1 : 0) != 0, (Object)"Error: MultiWorkUnit cannot be skipped");
                for (WorkUnit wu : ((MultiWorkUnit)workUnit).getWorkUnits()) {
                    Preconditions.checkArgument((!wu.contains("workunit.skip") ? 1 : 0) != 0, (Object)"Error: MultiWorkUnit cannot contain skipped WorkUnit");
                }
            }
            if (workUnit.getPropAsBoolean("workunit.skip", false)) {
                WorkUnitState workUnitState = new WorkUnitState(workUnit, (State)this.jobState);
                workUnitState.setWorkingState(WorkUnitState.WorkingState.SKIPPED);
                this.jobState.addSkippedTaskState(new TaskState(workUnitState));
                return false;
            }
            return true;
        }

        @ConstructorProperties(value={"jobState"})
        public SkippedWorkUnitsFilter(JobState jobState) {
            this.jobState = jobState;
        }
    }
}

