/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.mapreduce;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.task.BaseAbstractTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MRTask
extends BaseAbstractTask {
    private static final Logger log = LoggerFactory.getLogger(MRTask.class);
    private static final String JOB_CONFIGURATION_PREFIX = "MRTask.jobConfiguration.";
    private final TaskContext taskContext;
    private final EventSubmitter eventSubmitter;
    protected Job mrJob;

    public static void serializeJobToState(State state, Job job) {
        for (Map.Entry entry : job.getConfiguration()) {
            state.setProp(JOB_CONFIGURATION_PREFIX + (String)entry.getKey(), entry.getValue());
        }
    }

    public MRTask(TaskContext taskContext) {
        super(taskContext);
        this.taskContext = taskContext;
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.MRTask").addMetadata(this.additionalEventMetadata()).build();
    }

    public void onMRTaskComplete(boolean isSuccess, Throwable t) throws RuntimeException {
        if (isSuccess) {
            this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
        } else if (t == null) {
            this.workingState = WorkUnitState.WorkingState.FAILED;
        } else {
            log.error("Failed to run MR job with exception {}", (Object)ExceptionUtils.getStackTrace((Throwable)t));
            this.workingState = WorkUnitState.WorkingState.FAILED;
        }
    }

    @Override
    public void commit() {
        log.debug("State is set to {} inside onMRTaskComplete.", (Object)this.workingState);
    }

    @Override
    public void run() {
        try {
            Job job = this.createJob();
            if (job == null) {
                log.info("No MR job created. Skipping.");
                this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
                this.eventSubmitter.submit("MRJobSkipped");
                this.onSkippedMRJob();
                return;
            }
            job.submit();
            log.info("MR tracking URL {} for job {}", (Object)job.getTrackingURL(), (Object)job.getJobName());
            this.eventSubmitter.submit("MRJobStarted", new String[]{"jobTrackingUrl", job.getTrackingURL()});
            job.waitForCompletion(false);
            this.mrJob = job;
            if (job.isSuccessful()) {
                this.eventSubmitter.submit("MRJobSuccessful", new String[]{"jobTrackingUrl", job.getTrackingURL()});
                this.onMRTaskComplete(true, null);
            } else {
                this.eventSubmitter.submit("MRJobFailed", new String[]{"jobTrackingUrl", job.getTrackingURL()});
                this.onMRTaskComplete(false, new IOException(String.format("MR Job:%s is not successful", job.getTrackingURL())));
            }
        }
        catch (Throwable t) {
            log.error("Failed to run MR job.", t);
            this.eventSubmitter.submit("MRJobFailed", new String[]{"failureContext", t.getMessage()});
            this.onMRTaskComplete(false, t);
        }
    }

    protected Map<String, String> additionalEventMetadata() {
        return Maps.newHashMap();
    }

    protected Job createJob() throws IOException {
        Job job = Job.getInstance((Configuration)new Configuration());
        for (Map.Entry<Object, Object> entry : this.taskContext.getTaskState().getProperties().entrySet()) {
            if (!(entry.getKey() instanceof String) || !((String)entry.getKey()).startsWith(JOB_CONFIGURATION_PREFIX)) continue;
            String actualKey = ((String)entry.getKey()).substring(JOB_CONFIGURATION_PREFIX.length());
            job.getConfiguration().set(actualKey, (String)entry.getValue());
        }
        return job;
    }

    protected void onSkippedMRJob() {
    }

    public static class Events {
        public static final String MR_JOB_STARTED_EVENT = "MRJobStarted";
        public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful";
        public static final String MR_JOB_FAILED = "MRJobFailed";
        public static final String MR_JOB_SKIPPED = "MRJobSkipped";
        public static final String JOB_URL = "jobTrackingUrl";
        public static final String FAILURE_CONTEXT = "failureContext";
    }
}

