/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.mapreduce;

import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.mapreduce.GobblinOutputFormat;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GobblinOutputCommitter
extends OutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(GobblinOutputFormat.class);
    private Map<String, GobblinMultiTaskAttempt> attemptIdToMultiTaskAttempt = new ConcurrentHashMap<String, GobblinMultiTaskAttempt>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        LOG.info("Aborting Job: " + jobContext.getJobID() + " with state: " + state);
        Configuration conf = jobContext.getConfiguration();
        URI fsUri = URI.create(conf.get("fs.uri", "file:///"));
        FileSystem fs = FileSystem.get((URI)fsUri, (Configuration)conf);
        Path mrJobDir = new Path(conf.get("mr.job.root.dir"), conf.get("job.name"));
        Path jobInputDir = new Path(mrJobDir, "input");
        if (!fs.exists(jobInputDir) || !fs.isDirectory(jobInputDir)) {
            LOG.warn(String.format("%s either does not exist or is not a directory. No data to cleanup.", jobInputDir));
            return;
        }
        try {
            for (FileStatus status : fs.listStatus(jobInputDir, (PathFilter)new WorkUnitFilter())) {
                Closer workUnitFileCloser = Closer.create();
                if (status.getPath().getName().endsWith(".wu")) {
                    WorkUnit wu = WorkUnit.createEmpty();
                    try {
                        wu.readFields((DataInput)((Object)workUnitFileCloser.register((Closeable)new DataInputStream((InputStream)fs.open(status.getPath())))));
                    }
                    finally {
                        workUnitFileCloser.close();
                    }
                    JobLauncherUtils.cleanTaskStagingData((State)new WorkUnitState(wu), (Logger)LOG);
                }
                if (!status.getPath().getName().endsWith(".mwu")) continue;
                MultiWorkUnit mwu = MultiWorkUnit.createEmpty();
                try {
                    mwu.readFields((DataInput)((Object)workUnitFileCloser.register((Closeable)new DataInputStream((InputStream)fs.open(status.getPath())))));
                }
                finally {
                    workUnitFileCloser.close();
                }
                for (WorkUnit wu : mwu.getWorkUnits()) {
                    JobLauncherUtils.cleanTaskStagingData((State)new WorkUnitState(wu), (Logger)LOG);
                }
            }
        }
        finally {
            try {
                GobblinOutputCommitter.cleanUpWorkingDirectory(mrJobDir, fs);
            }
            finally {
                super.abortJob(jobContext, state);
            }
        }
    }

    public void abortTask(TaskAttemptContext arg0) throws IOException {
    }

    public void commitTask(TaskAttemptContext arg0) throws IOException {
        String taskAttemptId = arg0.getTaskAttemptID().toString();
        LOG.info("Committing task attempt: " + taskAttemptId);
        this.attemptIdToMultiTaskAttempt.get(taskAttemptId).commit();
    }

    public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
        return this.attemptIdToMultiTaskAttempt.containsKey(arg0.getTaskAttemptID().toString());
    }

    public void setupJob(JobContext arg0) throws IOException {
    }

    public void setupTask(TaskAttemptContext arg0) throws IOException {
    }

    public boolean isRecoverySupported() {
        return true;
    }

    public void recoverTask(TaskAttemptContext taskContext) throws IOException {
    }

    private static void cleanUpWorkingDirectory(Path mrJobDir, FileSystem fs) throws IOException {
        if (fs.exists(mrJobDir)) {
            fs.delete(mrJobDir, true);
            LOG.info("Deleted working directory " + mrJobDir);
        }
    }

    public Map<String, GobblinMultiTaskAttempt> getAttemptIdToMultiTaskAttempt() {
        return this.attemptIdToMultiTaskAttempt;
    }

    private static class WorkUnitFilter
    implements PathFilter {
        private WorkUnitFilter() {
        }

        public boolean accept(Path path) {
            return path.getName().endsWith(".wu") || path.getName().endsWith(".mwu");
        }
    }
}

