/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.ProxiedFileSystemWrapper;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobLauncherUtils {
    private static final Logger log = LoggerFactory.getLogger(JobLauncherUtils.class);
    private static Cache<String, FileSystem> fileSystemCacheByOwners = CacheBuilder.newBuilder().build();

    public static String newJobId(String jobName) {
        return Id.Job.create(jobName, System.currentTimeMillis()).toString();
    }

    public static String newJobId(String jobName, long executionId) {
        return Id.Job.create(jobName, executionId).toString();
    }

    public static String newTaskId(String jobId, int sequence) {
        return Id.Task.create(Id.parse(jobId).get(Id.Parts.INSTANCE_NAME), sequence).toString();
    }

    public static String newMultiTaskId(String jobId, int sequence) {
        return Id.MultiTask.create(Id.parse(jobId).get(Id.Parts.INSTANCE_NAME), sequence).toString();
    }

    public static List<WorkUnit> flattenWorkUnits(Collection<WorkUnit> workUnits) {
        ArrayList flattenedWorkUnits = Lists.newArrayList();
        for (WorkUnit workUnit : workUnits) {
            if (workUnit instanceof MultiWorkUnit) {
                flattenedWorkUnits.addAll(JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit)workUnit).getWorkUnits()));
                continue;
            }
            flattenedWorkUnits.add(workUnit);
        }
        return flattenedWorkUnits;
    }

    public static void cleanStagingData(List<? extends State> states, Logger logger) throws IOException {
        for (State state : states) {
            JobLauncherUtils.cleanTaskStagingData(state, logger);
        }
    }

    public static void cleanJobStagingData(State state, Logger logger) throws IOException {
        if (!state.contains("writer.staging.dir") || !state.contains("writer.output.dir")) {
            return;
        }
        String writerFsUri = state.getProp("writer.fs.uri", "file:///");
        FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, WriterUtils.getFsConfiguration(state));
        Path jobStagingPath = new Path(state.getProp("writer.staging.dir"));
        logger.info("Cleaning up staging directory " + jobStagingPath);
        HadoopUtils.deletePath(fs, jobStagingPath, true);
        if (fs.exists(jobStagingPath.getParent()) && fs.listStatus(jobStagingPath.getParent()).length == 0) {
            logger.debug("Deleting directory " + jobStagingPath.getParent());
            HadoopUtils.deletePath(fs, jobStagingPath.getParent(), true);
        }
        Path jobOutputPath = new Path(state.getProp("writer.output.dir"));
        logger.info("Cleaning up output directory " + jobOutputPath);
        HadoopUtils.deletePath(fs, jobOutputPath, true);
        if (fs.exists(jobOutputPath.getParent()) && fs.listStatus(jobOutputPath.getParent()).length == 0) {
            logger.debug("Deleting directory " + jobOutputPath.getParent());
            HadoopUtils.deletePath(fs, jobOutputPath.getParent(), true);
        }
        if (state.contains("qualitychecker.row.err.file") && state.getPropAsBoolean("qualitychecker.clean.err.dir", false)) {
            Path jobErrPath = new Path(state.getProp("qualitychecker.row.err.file"));
            log.debug("Cleaning up err directory : " + jobErrPath);
            HadoopUtils.deleteIfExists(fs, jobErrPath, true);
        }
    }

    public static void cleanTaskStagingData(State state, Logger logger) throws IOException {
        int numBranches = state.getPropAsInt("fork.branches", 1);
        for (int branchId = 0; branchId < numBranches; ++branchId) {
            Path outputPath;
            Path stagingPath;
            String writerFsUri = state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", numBranches, branchId), "file:///");
            FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, WriterUtils.getFsConfiguration(state));
            if (fs.exists(stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId))) {
                logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath());
                if (!fs.delete(stagingPath, true)) {
                    throw new IOException("Clean up staging directory " + stagingPath.toUri().getPath() + " failed");
                }
            }
            if (!fs.exists(outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId))) continue;
            logger.info("Cleaning up output directory " + outputPath.toUri().getPath());
            if (fs.delete(outputPath, true)) continue;
            throw new IOException("Clean up output directory " + outputPath.toUri().getPath() + " failed");
        }
    }

    public static void cleanTaskStagingData(State state, Logger logger, Closer closer, Map<String, ParallelRunner> parallelRunners) throws IOException {
        int numBranches = state.getPropAsInt("fork.branches", 1);
        int parallelRunnerThreads = state.getPropAsInt("parallel.runner.threads", 10);
        for (int branchId = 0; branchId < numBranches; ++branchId) {
            Path outputPath;
            String writerFsUri = state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", numBranches, branchId), "file:///");
            FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri, WriterUtils.getFsConfiguration(state));
            ParallelRunner parallelRunner = JobLauncherUtils.getParallelRunner(fs, closer, parallelRunnerThreads, parallelRunners);
            Path stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId);
            if (fs.exists(stagingPath)) {
                logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath());
                parallelRunner.deletePath(stagingPath, true);
            }
            if (!fs.exists(outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId))) continue;
            logger.info("Cleaning up output directory " + outputPath.toUri().getPath());
            parallelRunner.deletePath(outputPath, true);
        }
    }

    public static void cleanUpOldJobData(State state, Logger logger, boolean stagingDirProvided, boolean outputDirProvided) throws IOException {
        HashSet<Path> jobPaths = new HashSet<Path>();
        String writerFsUri = state.getProp("writer.fs.uri", "file:///");
        FileSystem fs = FileSystem.get((URI)URI.create(writerFsUri), (Configuration)WriterUtils.getFsConfiguration(state));
        Path jobPath = stagingDirProvided ? new Path(state.getProp("writer.staging.dir")).getParent() : new Path(state.getProp("writer.staging.dir")).getParent().getParent();
        jobPaths.add(jobPath);
        jobPath = outputDirProvided ? new Path(state.getProp("writer.output.dir")).getParent() : new Path(state.getProp("writer.output.dir")).getParent().getParent();
        jobPaths.add(jobPath);
        for (Path jobPathToDelete : jobPaths) {
            logger.info("Cleaning up old job directory " + jobPathToDelete);
            HadoopUtils.deletePath(fs, jobPathToDelete, true);
        }
    }

    public static FileSystem getFsWithProxy(final State state, final String fsUri, final Configuration conf) throws IOException {
        if (!state.getPropAsBoolean("should.fs.proxy.as.user", false)) {
            return FileSystem.get((URI)URI.create(fsUri), (Configuration)conf);
        }
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)state.getProp("fs.proxy.as.user.name")) ? 1 : 0) != 0, (Object)"State does not contain a proper proxy user name");
        String owner = state.getProp("fs.proxy.as.user.name");
        try {
            return (FileSystem)fileSystemCacheByOwners.get((Object)owner, (Callable)new Callable<FileSystem>(){

                @Override
                public FileSystem call() throws Exception {
                    return new ProxiedFileSystemWrapper().getProxiedFileSystem(state, ProxiedFileSystemWrapper.AuthType.KEYTAB, state.getProp("super.user.key.tab.location"), fsUri, conf);
                }
            });
        }
        catch (ExecutionException ee) {
            throw new IOException(ee.getCause());
        }
    }

    private static ParallelRunner getParallelRunner(FileSystem fs, Closer closer, int parallelRunnerThreads, Map<String, ParallelRunner> parallelRunners) {
        String uriAndHomeDir = new Path(new Path(fs.getUri()), fs.getHomeDirectory()).toString();
        if (!parallelRunners.containsKey(uriAndHomeDir)) {
            parallelRunners.put(uriAndHomeDir, (ParallelRunner)closer.register((Closeable)new ParallelRunner(parallelRunnerThreads, fs)));
        }
        return parallelRunners.get(uriAndHomeDir);
    }
}

