/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.mapreduce;

import com.github.rholder.retry.Retryer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigRenderOptions;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.RecordCountProvider;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.util.recordcount.LateFileRecordCountProvider;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MRCompactorJobRunner
implements Runnable,
Comparable<MRCompactorJobRunner> {
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorJobRunner.class);
    private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
    public static final String COMPACTION_JOB_OUTPUT_DIR_PERMISSION = "compaction.job.output.dir.permission";
    public static final String COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE = "compaction.job.target.output.file.size";
    public static final long DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE = 0x20000000L;
    public static final String COMPACTION_JOB_MAX_NUM_REDUCERS = "compaction.job.max.num.reducers";
    public static final int DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS = 3600;
    private static final String COMPACTION_JOB_OVERWRITE_OUTPUT_DIR = "compaction.job.overwrite.output.dir";
    private static final boolean DEFAULT_COMPACTION_JOB_OVERWRITE_OUTPUT_DIR = false;
    private static final String COMPACTION_JOB_ABORT_UPON_NEW_DATA = "compaction.job.abort.upon.new.data";
    private static final boolean DEFAULT_COMPACTION_JOB_ABORT_UPON_NEW_DATA = false;
    private static final String COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE = "compaction.job.copy.latedata.thread.pool.size";
    private static final int DEFAULT_COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE = 5;
    public static final String COMPACTION_JOB_USE_PRIME_REDUCERS = "compaction.job.use.prime.reducers";
    public static final boolean DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS = true;
    public static final String HADOOP_JOB_NAME = "Gobblin MR Compaction";
    private static final long MR_JOB_CHECK_COMPLETE_INTERVAL_MS = 5000L;
    private final boolean isRetryEnabled;
    private final String tmpFsUri;
    protected final Dataset dataset;
    protected final FileSystem fs;
    protected final FileSystem tmpFs;
    protected final FsPermission perm;
    protected final boolean shouldDeduplicate;
    protected final boolean outputDeduplicated;
    protected final boolean recompactFromDestPaths;
    protected final boolean recompactAllData;
    protected final boolean renameSourceDir;
    protected final boolean usePrimeReducers;
    protected final EventSubmitter eventSubmitter;
    private final RecordCountProvider inputRecordCountProvider;
    private final RecordCountProvider outputRecordCountProvider;
    private final LateFileRecordCountProvider lateInputRecordCountProvider;
    private final LateFileRecordCountProvider lateOutputRecordCountProvider;
    private final DatasetHelper datasetHelper;
    private final int copyLateDataThreadPoolSize;
    private final String outputExtension;
    private volatile Policy policy = Policy.DO_NOT_PUBLISH_DATA;
    private volatile Status status = Status.RUNNING;
    private final Cache<Path, List<Path>> applicablePathCache;
    static final String COMPACTION_RETRY_PREFIX = "compaction.job.retry.";
    static final String COMPACTION_RETRY_ENABLED = "compaction.job.retry.enabled";
    static final String COMPACTION_TMP_FS = "compaction.tmp.fs";
    static final Config COMPACTION_RETRY_DEFAULTS;
    protected final Config retrierConfig;

    protected MRCompactorJobRunner(Dataset dataset, FileSystem fs) {
        this.dataset = dataset;
        this.fs = fs;
        this.perm = HadoopUtils.deserializeFsPermission((State)this.dataset.jobProps(), (String)COMPACTION_JOB_OUTPUT_DIR_PERMISSION, (FsPermission)FsPermission.getDefault());
        this.recompactFromDestPaths = this.dataset.jobProps().getPropAsBoolean("compaction.recompact.from.dest.paths", false);
        this.recompactAllData = this.dataset.jobProps().getPropAsBoolean("compaction.recompact.all.data", true);
        this.renameSourceDir = this.dataset.jobProps().getPropAsBoolean("compaction.rename.source.dir.enabled", false);
        Preconditions.checkArgument((boolean)this.dataset.jobProps().contains("compaction.should.deduplicate"), (Object)String.format("Missing property %s for dataset %s", "compaction.should.deduplicate", this.dataset));
        this.shouldDeduplicate = this.dataset.jobProps().getPropAsBoolean("compaction.should.deduplicate");
        this.outputDeduplicated = this.dataset.jobProps().getPropAsBoolean("compaction.output.deduplicated", true);
        this.usePrimeReducers = this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_USE_PRIME_REDUCERS, true);
        this.eventSubmitter = new EventSubmitter.Builder(GobblinMetrics.get((String)this.dataset.jobProps().getProp("job.name")).getMetricContext(), "compaction.tracking.events").build();
        this.copyLateDataThreadPoolSize = this.dataset.jobProps().getPropAsInt(COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE, 5);
        this.tmpFsUri = this.dataset.jobProps().getProp(COMPACTION_TMP_FS, null);
        try {
            Log.info((String)("Tmp fs uri:" + this.tmpFsUri));
            this.tmpFs = this.tmpFsUri != null ? FileSystem.get((URI)new URI(this.tmpFsUri), (Configuration)new Configuration()) : this.fs;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed get Filesystem from tmp fs uri", e);
        }
        try {
            this.inputRecordCountProvider = (RecordCountProvider)Class.forName(this.dataset.jobProps().getProp("compaction.input.record.count.provider", MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)).newInstance();
            this.outputRecordCountProvider = (RecordCountProvider)Class.forName(this.dataset.jobProps().getProp("compaction.output.record.count.provider", MRCompactor.DEFAULT_COMPACTION_OUTPUT_RECORD_COUNT_PROVIDER)).newInstance();
            this.lateInputRecordCountProvider = new LateFileRecordCountProvider(this.inputRecordCountProvider);
            this.lateOutputRecordCountProvider = new LateFileRecordCountProvider(this.outputRecordCountProvider);
            this.isRetryEnabled = this.dataset.jobProps().getPropAsBoolean(COMPACTION_RETRY_ENABLED, false);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to instantiate RecordCountProvider", e);
        }
        this.applicablePathCache = CacheBuilder.newBuilder().maximumSize(2000L).build();
        this.datasetHelper = new DatasetHelper(this.dataset, this.fs, this.getApplicableFileExtensions());
        this.outputExtension = this.dataset.jobProps().getProp("compaction.extension", ".avro");
        if (this.isRetryEnabled) {
            this.retrierConfig = ConfigBuilder.create().loadProps(this.dataset.jobProps().getProperties(), COMPACTION_RETRY_PREFIX).build().withFallback((ConfigMergeable)COMPACTION_RETRY_DEFAULTS);
            LOG.info("Retry enabled for compaction publish :" + this.retrierConfig.root().render(ConfigRenderOptions.concise()));
        } else {
            this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
            LOG.info("Retry disabled for compaction");
        }
    }

    @Override
    public void run() {
        Configuration conf = HadoopUtils.getConfFromState((State)this.dataset.jobProps());
        if (conf.get("mapreduce.output.fileoutputformat.compress") == null && conf.get("mapred.output.compress") == null) {
            conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
        }
        if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        }
        try {
            DateTime compactionTimestamp = this.getCompactionTimestamp();
            LOG.info("MR Compaction Job Timestamp " + compactionTimestamp.getMillis());
            if (this.dataset.jobProps().getPropAsBoolean("compaction.job.late.data.movement.task", false)) {
                ArrayList newLateFilePaths = Lists.newArrayList();
                for (String filePathString : this.dataset.jobProps().getPropAsList("compaction.job.late.data.files")) {
                    if (!FilenameUtils.isExtension((String)filePathString, this.getApplicableFileExtensions())) continue;
                    newLateFilePaths.add(new Path(filePathString));
                }
                Path lateDataOutputPath = this.outputDeduplicated ? this.dataset.outputLatePath() : this.dataset.outputPath();
                LOG.info(String.format("Copying %d late data files to %s", newLateFilePaths.size(), lateDataOutputPath));
                if (this.outputDeduplicated && !this.fs.exists(lateDataOutputPath) && !this.fs.mkdirs(lateDataOutputPath)) {
                    throw new RuntimeException(String.format("Failed to create late data output directory: %s.", lateDataOutputPath.toString()));
                }
                this.copyDataFiles(lateDataOutputPath, newLateFilePaths);
                if (this.outputDeduplicated) {
                    this.dataset.checkIfNeedToRecompact(this.datasetHelper);
                }
                this.status = Status.COMMITTED;
            } else {
                if (this.fs.exists(this.dataset.outputPath()) && !this.canOverwriteOutputDir()) {
                    LOG.warn(String.format("Output paths %s exists. Will not compact %s.", this.dataset.outputPath(), this.dataset.inputPaths()));
                    this.status = Status.COMMITTED;
                    return;
                }
                this.addJars(conf);
                Job job = Job.getInstance((Configuration)conf);
                this.configureJob(job);
                this.submitAndWait(job);
                if (this.shouldPublishData(compactionTimestamp)) {
                    List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, this.dataset.outputTmpPath(), this.tmpFs, (List<String>)ImmutableList.of((Object)"avro"));
                    if (!this.recompactAllData && this.recompactFromDestPaths) {
                        this.addGoodFilesToOutputPath(goodPaths);
                        this.deleteFilesByPaths(this.dataset.inputPaths());
                    } else {
                        this.moveTmpPathToOutputPath();
                        if (this.recompactFromDestPaths) {
                            this.deleteFilesByPaths(this.dataset.additionalInputPaths());
                        }
                    }
                    this.submitSlaEvent(job);
                    LOG.info("Successfully published data for input folder " + this.dataset.inputPaths());
                    this.status = Status.COMMITTED;
                } else {
                    LOG.info("Data not published for input folder " + this.dataset.inputPaths() + " due to incompleteness");
                    this.status = Status.ABORTED;
                    return;
                }
            }
            if (this.renameSourceDir) {
                MRCompactor.renameSourceDirAsCompactionComplete(this.fs, this.dataset);
            } else {
                this.markOutputDirAsCompleted(compactionTimestamp);
            }
            this.submitRecordsCountsEvent();
        }
        catch (Throwable t) {
            throw Throwables.propagate((Throwable)t);
        }
    }

    private DateTime getCompactionTimestamp() throws IOException {
        DateTimeZone timeZone = DateTimeZone.forID((String)this.dataset.jobProps().getProp("compaction.timezone", "America/Los_Angeles"));
        if (!this.recompactFromDestPaths) {
            return new DateTime(timeZone);
        }
        Set<Path> inputPaths = this.getInputPaths();
        long maxTimestamp = Long.MIN_VALUE;
        for (FileStatus status : FileListUtils.listFilesRecursively((FileSystem)this.fs, inputPaths)) {
            maxTimestamp = Math.max(maxTimestamp, status.getModificationTime());
        }
        return maxTimestamp == Long.MIN_VALUE ? new DateTime(timeZone) : new DateTime(maxTimestamp, timeZone);
    }

    private void copyDataFiles(final Path outputDirectory, List<Path> inputFilePaths) throws IOException {
        ScalingThreadPoolExecutor executor = ScalingThreadPoolExecutor.newScalingThreadPool((int)0, (int)this.copyLateDataThreadPoolSize, (long)100L, (ThreadFactory)ExecutorsUtils.newThreadFactory((Optional)Optional.of((Object)LOG), (Optional)Optional.of((Object)(this.dataset.getName() + "-copy-data"))));
        ArrayList futures = Lists.newArrayList();
        for (final Path filePath : inputFilePaths) {
            Future<Void> future = executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    Path convertedFilePath = MRCompactorJobRunner.this.outputRecordCountProvider.convertPath(LateFileRecordCountProvider.restoreFilePath((Path)filePath), MRCompactorJobRunner.this.outputExtension, MRCompactorJobRunner.this.inputRecordCountProvider);
                    String targetFileName = convertedFilePath.getName();
                    Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(targetFileName, MRCompactorJobRunner.this.fs, outputDirectory);
                    HadoopUtils.copyPath((FileSystem)MRCompactorJobRunner.this.fs, (Path)filePath, (FileSystem)MRCompactorJobRunner.this.fs, (Path)outPath, (boolean)true, (Configuration)MRCompactorJobRunner.this.fs.getConf());
                    LOG.debug(String.format("Copied %s to %s.", filePath, outPath));
                    return null;
                }
            });
            futures.add(future);
        }
        try {
            for (Future future : futures) {
                future.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException("Failed to copy file.", e);
        }
        finally {
            ExecutorsUtils.shutdownExecutorService((ExecutorService)executor, (Optional)Optional.of((Object)LOG));
        }
    }

    private boolean canOverwriteOutputDir() {
        return this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_OVERWRITE_OUTPUT_DIR, false) || this.recompactFromDestPaths;
    }

    private void addJars(Configuration conf) throws IOException {
        if (!this.dataset.jobProps().contains("compaction.jars")) {
            return;
        }
        Path jarFileDir = new Path(this.dataset.jobProps().getProp("compaction.jars"));
        for (FileStatus status : this.fs.listStatus(jarFileDir)) {
            DistributedCache.addFileToClassPath((Path)status.getPath(), (Configuration)conf, (FileSystem)this.fs);
        }
    }

    protected void configureJob(Job job) throws IOException {
        job.setJobName("Gobblin MR Compaction (" + this.dataset.getDatasetName() + ")");
        this.configureInputAndOutputPaths(job);
        this.configureMapper(job);
        this.configureReducer(job);
        if (!this.shouldDeduplicate) {
            job.setNumReduceTasks(0);
        }
    }

    private void configureInputAndOutputPaths(Job job) throws IOException {
        for (Path inputPath : this.getInputPaths()) {
            FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        }
        this.tmpFs.delete(this.dataset.outputTmpPath(), true);
        FileOutputFormat.setOutputPath((Job)job, (Path)this.dataset.outputTmpPath());
    }

    private Set<Path> getInputPaths() {
        return ImmutableSet.builder().addAll(this.dataset.inputPaths()).addAll(this.dataset.additionalInputPaths()).build();
    }

    public Dataset getDataset() {
        return this.dataset;
    }

    protected void configureMapper(Job job) {
        this.setInputFormatClass(job);
        this.setMapperClass(job);
        this.setMapOutputKeyClass(job);
        this.setMapOutputValueClass(job);
    }

    protected void configureReducer(Job job) throws IOException {
        this.setOutputFormatClass(job);
        this.setReducerClass(job);
        this.setOutputKeyClass(job);
        this.setOutputValueClass(job);
        this.setNumberOfReducers(job);
    }

    protected abstract void setInputFormatClass(Job var1);

    protected abstract void setMapperClass(Job var1);

    protected abstract void setMapOutputKeyClass(Job var1);

    protected abstract void setMapOutputValueClass(Job var1);

    protected abstract void setOutputFormatClass(Job var1);

    protected abstract void setReducerClass(Job var1);

    protected abstract void setOutputKeyClass(Job var1);

    protected abstract void setOutputValueClass(Job var1);

    protected abstract Collection<String> getApplicableFileExtensions();

    protected void setNumberOfReducers(Job job) throws IOException {
        long inputSize = this.getInputSize();
        long targetFileSize = this.getTargetFileSize();
        int numReducers = Math.min(Ints.checkedCast((long)(inputSize / targetFileSize)) + 1, this.getMaxNumReducers());
        if (this.usePrimeReducers && numReducers != 1) {
            numReducers = Primes.nextPrime((int)numReducers);
        }
        job.setNumReduceTasks(numReducers);
    }

    private long getInputSize() throws IOException {
        long inputSize = 0L;
        for (Path inputPath : this.getInputPaths()) {
            inputSize += this.fs.getContentSummary(inputPath).getLength();
        }
        return inputSize;
    }

    private long getTargetFileSize() {
        return this.dataset.jobProps().getPropAsLong(COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE, 0x20000000L);
    }

    private int getMaxNumReducers() {
        return this.dataset.jobProps().getPropAsInt(COMPACTION_JOB_MAX_NUM_REDUCERS, 3600);
    }

    private void submitAndWait(Job job) throws ClassNotFoundException, IOException, InterruptedException {
        job.submit();
        MRCompactor.addRunningHadoopJob(this.dataset, job);
        LOG.info(String.format("MR job submitted for dataset %s, input %s, url: %s", this.dataset, this.getInputPaths(), job.getTrackingURL()));
        while (!job.isComplete()) {
            if (this.policy == Policy.ABORT_ASAP) {
                LOG.info(String.format("MR job for dataset %s, input %s killed due to input data incompleteness. Will try again later", this.dataset, this.getInputPaths()));
                job.killJob();
                return;
            }
            Thread.sleep(5000L);
        }
        if (!job.isSuccessful()) {
            throw new RuntimeException(String.format("MR job failed for topic %s, input %s, url: %s", this.dataset, this.getInputPaths(), job.getTrackingURL()));
        }
    }

    private boolean shouldPublishData(DateTime jobStartTime) throws IOException {
        if (this.policy != Policy.DO_PUBLISH_DATA) {
            return false;
        }
        if (!this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_ABORT_UPON_NEW_DATA, false)) {
            return true;
        }
        for (Path inputPath : this.getInputPaths()) {
            if (!this.findNewDataSinceCompactionStarted(inputPath, jobStartTime)) continue;
            return false;
        }
        return true;
    }

    private boolean findNewDataSinceCompactionStarted(Path inputPath, DateTime jobStartTime) throws IOException {
        for (FileStatus fstat : FileListUtils.listFilesRecursively((FileSystem)this.fs, (Path)inputPath)) {
            DateTime fileModificationTime = new DateTime(fstat.getModificationTime());
            if (!fileModificationTime.isAfter((ReadableInstant)jobStartTime)) continue;
            LOG.info(String.format("Found new file %s in input folder %s after compaction started. Will abort compaction.", fstat.getPath(), inputPath));
            return true;
        }
        return false;
    }

    private void markOutputDirAsCompleted(DateTime jobStartTime) throws IOException {
        Path completionFilePath = new Path(this.dataset.outputPath(), "_COMPACTION_COMPLETE");
        try (FSDataOutputStream completionFileStream = this.fs.create(completionFilePath);){
            completionFileStream.writeLong(jobStartTime.getMillis());
        }
    }

    private void moveTmpPathToOutputPath() throws IOException {
        Retryer retryer = RetryerFactory.newInstance((Config)this.retrierConfig);
        LOG.info(String.format("Moving %s to %s", this.dataset.outputTmpPath(), this.dataset.outputPath()));
        this.fs.delete(this.dataset.outputPath(), true);
        if (this.isRetryEnabled) {
            try {
                retryer.call(() -> {
                    if (this.fs.exists(this.dataset.outputPath())) {
                        throw new IOException("Path " + this.dataset.outputPath() + " exists however it should not. Will wait more.");
                    }
                    return null;
                });
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        WriterUtils.mkdirsWithRecursivePermissionWithRetry((FileSystem)this.fs, (Path)this.dataset.outputPath().getParent(), (FsPermission)this.perm, (Config)this.retrierConfig);
        Log.info((String)("Moving from fs: (" + this.tmpFs.getUri() + ") path: " + this.dataset.outputTmpPath() + " to fs: (" + FileSystem.get((URI)this.dataset.outputPath().getParent().toUri(), (Configuration)this.fs.getConf()).getUri() + ") output path: " + this.dataset.outputPath()));
        HadoopUtils.movePath((FileSystem)this.tmpFs, (Path)this.dataset.outputTmpPath(), (FileSystem)FileSystem.get((URI)this.dataset.outputPath().getParent().toUri(), (Configuration)this.fs.getConf()), (Path)this.dataset.outputPath(), (boolean)false, (Configuration)this.fs.getConf());
    }

    private void addGoodFilesToOutputPath(List<Path> goodPaths) throws IOException {
        for (Path path : goodPaths) {
            String fileName = path.getName();
            LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
            Path outPath = this.lateOutputRecordCountProvider.constructLateFilePath(fileName, this.fs, this.dataset.outputPath());
            HadoopUtils.movePath((FileSystem)this.tmpFs, (Path)path, (FileSystem)FileSystem.get((URI)this.dataset.outputPath().getParent().toUri(), (Configuration)this.fs.getConf()), (Path)outPath, (boolean)false, (Configuration)this.fs.getConf());
        }
    }

    private void deleteFilesByPaths(Set<Path> paths) throws IOException {
        for (Path path : paths) {
            HadoopUtils.deletePathAndEmptyAncestors((FileSystem)this.fs, (Path)path, (boolean)true);
        }
    }

    public void proceed() {
        this.policy = Policy.DO_PUBLISH_DATA;
    }

    public void abort() {
        this.policy = Policy.ABORT_ASAP;
    }

    public Status status() {
        return this.status;
    }

    @Override
    public int compareTo(MRCompactorJobRunner o) {
        return Double.compare(o.dataset.priority(), this.dataset.priority());
    }

    private List<Path> getApplicableFilePaths(final Path dataDir, final FileSystem fs) throws IOException {
        try {
            return (List)this.applicablePathCache.get((Object)dataDir, (Callable)new Callable<List<Path>>(){

                @Override
                public List<Path> call() throws Exception {
                    if (!MRCompactorJobRunner.this.fs.exists(dataDir)) {
                        return Lists.newArrayList();
                    }
                    ArrayList paths = Lists.newArrayList();
                    for (FileStatus fileStatus : FileListUtils.listFilesRecursively((FileSystem)fs, (Path)dataDir, (PathFilter)new PathFilter(){

                        public boolean accept(Path path) {
                            for (String validExtention : MRCompactorJobRunner.this.getApplicableFileExtensions()) {
                                if (!path.getName().endsWith(validExtention)) continue;
                                return true;
                            }
                            return false;
                        }
                    })) {
                        paths.add(fileStatus.getPath());
                    }
                    return paths;
                }
            });
        }
        catch (ExecutionException e) {
            throw new IOException(e);
        }
    }

    private void submitSlaEvent(Job job) {
        try {
            CompactionSlaEventHelper.getEventSubmitterBuilder(this.dataset, (Optional<Job>)Optional.of((Object)job), this.fs).eventSubmitter(this.eventSubmitter).eventName("CompactionCompleted").additionalMetadata("lateRecordCount", Long.toString(this.lateOutputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset.outputLatePath(), this.fs)))).additionalMetadata("regularRecordCount", Long.toString(this.outputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset.outputPath(), this.fs)))).additionalMetadata("recompacted", Boolean.toString(this.dataset.needToRecompact())).build().submit();
        }
        catch (Throwable e) {
            LOG.warn("Failed to submit compaction completed event:" + e, e);
        }
    }

    private void submitRecordsCountsEvent() {
        long lateOutputRecordCount = this.datasetHelper.getLateOutputRecordCount();
        long outputRecordCount = this.datasetHelper.getOutputRecordCount();
        try {
            CompactionSlaEventHelper.getEventSubmitterBuilder(this.dataset, (Optional<Job>)Optional.absent(), this.fs).eventSubmitter(this.eventSubmitter).eventName("CompactionRecordCounts").additionalMetadata("datasetOutputPath", this.dataset.outputPath().toString()).additionalMetadata("lateRecordCount", Long.toString(lateOutputRecordCount)).additionalMetadata("regularRecordCount", Long.toString(outputRecordCount)).additionalMetadata("needRecompact", Boolean.toString(this.dataset.needToRecompact())).build().submit();
        }
        catch (Throwable e) {
            LOG.warn("Failed to submit late event count:" + e, e);
        }
    }

    static {
        ImmutableMap configMap = ImmutableMap.builder().put((Object)"time_out_ms", (Object)TimeUnit.MINUTES.toMillis(2L)).put((Object)"interval_ms", (Object)TimeUnit.SECONDS.toMillis(5L)).put((Object)"multiplier", (Object)2L).put((Object)"retry_type", (Object)RetryerFactory.RetryType.EXPONENTIAL.name()).build();
        COMPACTION_RETRY_DEFAULTS = ConfigFactory.parseMap((Map)configMap);
    }

    public static enum Status {
        ABORTED,
        COMMITTED,
        RUNNING;

    }

    public static enum Policy {
        DO_PUBLISH_DATA,
        DO_NOT_PUBLISH_DATA,
        ABORT_ASAP;

    }
}

