/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.job;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageCleanupJob
extends AbstractApplication {
    protected static final Option OPTION_DELETE;
    protected static final Option OPTION_FORCE;
    protected static final Logger logger;
    public static final int deleteTimeout = 10;
    protected boolean delete = false;
    protected boolean force = false;
    protected static ExecutableManager executableManager;

    protected void cleanUnusedHBaseTables() throws IOException {
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        if ("hbase".equals(config.getMetadataUrl().getScheme())) {
            try {
                Class<?> hbaseCleanUpUtil = Class.forName("org.apache.kylin.rest.job.StorageCleanJobHbaseUtil");
                Method cleanUnusedHBaseTables = hbaseCleanUpUtil.getDeclaredMethod("cleanUnusedHBaseTables", Boolean.TYPE, Integer.TYPE);
                cleanUnusedHBaseTables.invoke(hbaseCleanUpUtil, this.delete, 10);
            }
            catch (Throwable e) {
                throw new IOException(e);
            }
        }
    }

    protected Options getOptions() {
        Options options = new Options();
        options.addOption(OPTION_DELETE);
        options.addOption(OPTION_FORCE);
        return options;
    }

    protected void execute(OptionsHelper optionsHelper) throws Exception {
        logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
        logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'");
        logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'");
        this.delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
        this.force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE));
        this.cleanUnusedIntermediateHiveTable();
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        if (StringUtils.isNotEmpty((CharSequence)config.getHBaseClusterFs())) {
            this.cleanUnusedHdfsFiles(HBaseConfiguration.create());
        }
        Configuration conf = HadoopUtil.getCurrentConfiguration();
        this.cleanUnusedHdfsFiles(conf);
        this.cleanUnusedHBaseTables();
    }

    private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
        JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
        CubeManager cubeMgr = CubeManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv());
        FileSystem fs = HadoopUtil.getWorkingFileSystem((Configuration)conf);
        ArrayList<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
        try {
            FileStatus[] fStatus;
            for (FileStatus status : fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()))) {
                String path = status.getPath().getName();
                if (!path.startsWith("kylin-")) continue;
                String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
                allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
            }
        }
        catch (FileNotFoundException e) {
            logger.info("Working Directory does not exist on HDFS. ");
        }
        List allJobs = executableManager.getAllJobIds();
        for (String jobId : allJobs) {
            ExecutableState state = executableManager.getOutput(jobId).getState();
            if (state.isFinalState()) continue;
            String path = JobBuilderSupport.getJobWorkingDir((String)engineConfig.getHdfsWorkingDirectory(), (String)jobId);
            allHdfsPathsNeedToBeDeleted.remove(path);
            logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state);
        }
        for (CubeInstance cube : cubeMgr.listAllCubes()) {
            for (CubeSegment seg : cube.getSegments()) {
                String jobUuid = seg.getLastBuildJobID();
                if (jobUuid == null || jobUuid.equals("")) continue;
                String path = JobBuilderSupport.getJobWorkingDir((String)engineConfig.getHdfsWorkingDirectory(), (String)jobUuid);
                allHdfsPathsNeedToBeDeleted.remove(path);
                logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
            }
        }
        if (this.delete) {
            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
                logger.info("Deleting hdfs path " + hdfsPath);
                Path p = new Path(hdfsPath);
                if (fs.exists(p)) {
                    fs.delete(p, true);
                    logger.info("Deleted hdfs path " + hdfsPath);
                    continue;
                }
                logger.info("Hdfs path " + hdfsPath + "does not exist");
            }
        } else {
            System.out.println("--------------- HDFS Path To Be Deleted ---------------");
            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
                System.out.println(hdfsPath);
            }
            System.out.println("-------------------------------------------------------");
        }
    }

    private void cleanUnusedIntermediateHiveTable() throws Exception {
        Configuration conf = HadoopUtil.getCurrentConfiguration();
        KylinConfig config = KylinConfig.getInstanceFromEnv();
        JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
        CliCommandExecutor cmdExec = config.getCliCommandExecutor();
        int uuidLength = 36;
        String preFix = "kylin_intermediate_";
        String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
        List hiveTableNames = explr.listTables(config.getHiveDatabaseForIntermediateTable());
        Iterable kylinIntermediates = Iterables.filter((Iterable)hiveTableNames, (Predicate)new Predicate<String>(){

            public boolean apply(@Nullable String input) {
                return input != null && input.startsWith("kylin_intermediate_");
            }
        });
        List allJobs = executableManager.getAllJobIds();
        ArrayList<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
        ArrayList<String> workingJobList = new ArrayList<String>();
        HashMap segmentId2JobId = Maps.newHashMap();
        StringBuilder sb = new StringBuilder();
        for (String jobId : allJobs) {
            ExecutableState state = executableManager.getOutput(jobId).getState();
            if (!state.isFinalState()) {
                workingJobList.add(jobId);
                sb.append(jobId).append("(").append(state).append("), ");
            }
            try {
                String segmentId = this.getSegmentIdFromJobId(jobId);
                if (segmentId == null) continue;
                segmentId2JobId.put(segmentId, jobId);
            }
            catch (Exception ex) {
                logger.warn("Failed to find segment ID from job ID " + jobId + ", ignore it");
            }
        }
        logger.info("Working jobIDs: " + workingJobList);
        for (String line : kylinIntermediates) {
            logger.info("Checking table " + line);
            if (!line.startsWith("kylin_intermediate_")) continue;
            if (this.force) {
                logger.warn("Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!");
                allHiveTablesNeedToBeDeleted.add(line);
                continue;
            }
            boolean isNeedDel = true;
            if (line.length() < "kylin_intermediate_".length() + 36) {
                logger.info("Skip deleting because length is not qualified");
                continue;
            }
            String uuid = line.substring(line.length() - 36, line.length());
            uuid = uuid.replace("_", "-");
            Pattern UUID_PATTERN = Pattern.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}");
            if (!UUID_PATTERN.matcher(uuid).matches()) {
                logger.info("Skip deleting because pattern doesn't match");
                continue;
            }
            if (allJobs.contains(uuid)) {
                isNeedDel = !workingJobList.contains(uuid);
            } else if (this.isTableInUse(uuid, workingJobList)) {
                logger.info("Skip deleting because the table is in use");
                isNeedDel = false;
            }
            if (!isNeedDel) continue;
            allHiveTablesNeedToBeDeleted.add(line);
        }
        if (this.delete) {
            try {
                String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";";
                HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
                hiveCmdBuilder.addStatement(useDatabaseHql);
                for (String delHive : allHiveTablesNeedToBeDeleted) {
                    hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; ");
                    logger.info("Remove " + delHive + " from hive tables.");
                }
                cmdExec.execute(hiveCmdBuilder.build());
                for (String tableToDelete : allHiveTablesNeedToBeDeleted) {
                    String uuid = tableToDelete.substring(tableToDelete.length() - 36, tableToDelete.length());
                    String segmentId = uuid.replace("_", "-");
                    if (segmentId2JobId.containsKey(segmentId)) {
                        String path = JobBuilderSupport.getJobWorkingDir((String)engineConfig.getHdfsWorkingDirectory(), (String)((String)segmentId2JobId.get(segmentId))) + "/" + tableToDelete;
                        Path externalDataPath = new Path(path);
                        FileSystem fs = HadoopUtil.getWorkingFileSystem();
                        if (fs.exists(externalDataPath)) {
                            fs.delete(externalDataPath, true);
                            logger.info("Hive table {}'s external path {} deleted", (Object)tableToDelete, (Object)path);
                            continue;
                        }
                        logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.source.hive.keep-flat-table set false (By default)", (Object)tableToDelete, (Object)path);
                        continue;
                    }
                    logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", (Object)tableToDelete, (Object)((Object)segmentId2JobId).toString());
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
            for (String hiveTable : allHiveTablesNeedToBeDeleted) {
                System.out.println(hiveTable);
            }
            System.out.println("----------------------------------------------------");
        }
    }

    private String getSegmentIdFromJobId(String jobId) {
        AbstractExecutable abstractExecutable = executableManager.getJob(jobId);
        String segmentId = abstractExecutable.getParam("segmentId");
        return segmentId;
    }

    private boolean isTableInUse(String segUuid, List<String> workingJobList) {
        for (String jobId : workingJobList) {
            String segmentId = this.getSegmentIdFromJobId(jobId);
            if (!segUuid.equals(segmentId)) continue;
            return true;
        }
        return false;
    }

    static {
        OptionBuilder.withArgName((String)"delete");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Delete the unused storage");
        OPTION_DELETE = OptionBuilder.create((String)"delete");
        OptionBuilder.withArgName((String)"force");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Warning: will delete all kylin intermediate hive tables");
        OPTION_FORCE = OptionBuilder.create((String)"force");
        logger = LoggerFactory.getLogger(StorageCleanupJob.class);
        executableManager = ExecutableManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv());
    }
}

