/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.impl;

import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.exception.ErrorCodeSupplier;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.persistence.ImageDesc;
import org.apache.kylin.common.persistence.MetadataType;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.AuditLogStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.app.StreamingMergeEntry;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.apache.kylin.streaming.event.StreamingJobMetaCleanEvent;
import org.apache.kylin.streaming.jobs.AbstractSparkJobLauncher;
import org.apache.kylin.streaming.jobs.StreamingJobUtils;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingJobLauncher
extends AbstractSparkJobLauncher {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingJobLauncher.class);
    private static final String KRB5CONF_PROPS = "java.security.krb5.conf";
    private static final String JAASCONF_PROPS = "java.security.auth.login.config";
    private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/";
    private Map<String, String> jobParams;
    private String mainClazz;
    private String[] appArgs;
    private Long currentTimestamp;
    private StorageURL distMetaStorageUrl;
    private boolean isYarnCluster = false;
    private boolean initialized = false;

    @Override
    public void init(String project, String modelId, JobTypeEnum jobType) {
        super.init(project, modelId, jobType);
        this.jobParams = this.strmJob.getParams();
        this.currentTimestamp = System.currentTimeMillis();
        this.config = StreamingJobUtils.getStreamingKylinConfig(this.config, this.jobParams, modelId, project);
        this.initStorageUrl();
        switch (jobType) {
            case STREAMING_BUILD: {
                this.mainClazz = "org.apache.kylin.streaming.app.StreamingEntry";
                this.appArgs = new String[]{project, modelId, this.jobParams.getOrDefault("kylin.streaming.duration", "30"), this.jobParams.getOrDefault("kylin.streaming.watermark", StreamingConstants.STREAMING_WATERMARK_DEFAULT), this.distMetaStorageUrl.toString()};
                StreamingJobUtils.createExecutorJaas();
                break;
            }
            case STREAMING_MERGE: {
                this.mainClazz = "org.apache.kylin.streaming.app.StreamingMergeEntry";
                this.appArgs = new String[]{project, modelId, this.jobParams.getOrDefault("kylin.streaming.segment-max-size", "32m"), this.jobParams.getOrDefault("kylin.streaming.segment-merge-threshold", "3"), this.distMetaStorageUrl.toString()};
                break;
            }
            default: {
                throw new IllegalArgumentException("The streaming job Type " + jobType.name() + " is not supported...");
            }
        }
        this.initialized = true;
    }

    private DataParserInfo getDataParser(String parserName) {
        return DataParserManager.getInstance((KylinConfig)this.config, (String)this.project).getDataParserInfo(parserName);
    }

    private String getParserName() {
        return NDataModelManager.getInstance((KylinConfig)this.config, (String)this.project).getDataModelDesc(this.modelId).getRootFactTable().getTableDesc().getKafkaConfig().getParserName();
    }

    private String getParserJarPath(DataParserInfo parserInfo) {
        JarInfoManager manager = JarInfoManager.getInstance((KylinConfig)this.config, (String)this.project);
        return manager.getJarInfo(JarTypeEnum.STREAMING_CUSTOM_PARSER, parserInfo.getJarName()).getJarPath();
    }

    private String getDriverHDFSLogPath() {
        return String.format(Locale.ROOT, "%s/%s/%s/%s/driver.%s.log", this.config.getStreamingBaseJobsLocation(), this.project, this.jobId, this.currentTimestamp, this.currentTimestamp);
    }

    private String getJobTmpMetaStoreUrlPath() {
        return String.format(Locale.ROOT, "%s/%s/%s/meta", this.config.getStreamingBaseJobsLocation(), this.project, this.modelId);
    }

    private StorageURL getJobTmpHdfsMetaStorageUrl() {
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("path", String.format(Locale.ROOT, "%s/meta_%d", this.getJobTmpMetaStoreUrlPath(), this.currentTimestamp));
        params.put("zip", "true");
        params.put("snapshot", "true");
        return new StorageURL(this.config.getMetadataUrlPrefix(), "hdfs", params);
    }

    protected Set<String> getMetadataDumpList() {
        Set metaSet = NDataflowManager.getInstance((KylinConfig)this.config, (String)this.project).getDataflow(this.modelId).collectPrecalculationResource();
        metaSet.add(ResourceStore.METASTORE_IMAGE);
        String uuid = this.jobId.substring(0, 36);
        metaSet.add(MetadataType.mergeKeyWithType((String)(uuid + "_build"), (MetadataType)MetadataType.STREAMING_JOB));
        metaSet.add(MetadataType.mergeKeyWithType((String)(uuid + "_merge"), (MetadataType)MetadataType.STREAMING_JOB));
        return metaSet;
    }

    @Nullable
    private String getAvailableLatestDumpPath() {
        String jobTmpMetaPath = this.getJobTmpMetaStoreUrlPath();
        HadoopUtil.mkdirIfNotExist((String)jobTmpMetaPath);
        List metaFileStatus = HadoopUtil.getFileStatusPathsFromHDFSDir((String)jobTmpMetaPath, (boolean)false);
        long jobMetaRetainedTimeMills = this.config.getStreamingJobMetaRetainedTime();
        Map<Boolean, List<FileStatus>> outdatedMetaPathMap = metaFileStatus.stream().collect(Collectors.groupingBy(locatedFileStatus -> this.currentTimestamp - locatedFileStatus.getModificationTime() > jobMetaRetainedTimeMills));
        if (outdatedMetaPathMap.containsKey(Boolean.TRUE)) {
            List<Path> deletedPath = outdatedMetaPathMap.get(Boolean.TRUE).stream().map(FileStatus::getPath).collect(Collectors.toList());
            EventBusFactory.getInstance().postSync((Object)new StreamingJobMetaCleanEvent(deletedPath));
            log.info("delete by {} streaming meta path size:{}", (Object)this.jobId, (Object)deletedPath.size());
        }
        if (!outdatedMetaPathMap.containsKey(Boolean.FALSE)) {
            return null;
        }
        return outdatedMetaPathMap.get(Boolean.FALSE).stream().max(Comparator.comparingLong(FileStatus::getModificationTime)).map(fileStatus -> fileStatus.getPath().toString()).orElse(null);
    }

    private void initStorageUrl() {
        if (!StreamingUtils.isJobOnCluster((KylinConfig)this.config) || !StringUtils.equals((CharSequence)"hdfs", (CharSequence)this.jobParams.getOrDefault("kylin.streaming.meta-scheme", "hdfs"))) {
            this.distMetaStorageUrl = this.config.getMetadataUrl();
            return;
        }
        StorageURL metaDumpUrl = this.getJobTmpHdfsMetaStorageUrl();
        Preconditions.checkState((boolean)StringUtils.isNotEmpty((CharSequence)metaDumpUrl.toString()), (Object)"Missing metaUrl!");
        String availableMetaPath = this.getAvailableLatestDumpPath();
        if (StringUtils.isNotEmpty((CharSequence)availableMetaPath)) {
            HashMap<String, String> maps = new HashMap<String, String>(metaDumpUrl.getAllParameters());
            maps.put("path", availableMetaPath);
            this.distMetaStorageUrl = new StorageURL(metaDumpUrl.getIdentifier(), metaDumpUrl.getScheme(), maps);
            return;
        }
        KylinConfig backupConfig = KylinConfig.createKylinConfig((KylinConfig)this.config);
        backupConfig.setMetadataUrl(metaDumpUrl.toString());
        try (ResourceStore backupResourceStore = ResourceStore.getKylinMetaStore((KylinConfig)backupConfig);){
            MetadataStore backupMetadataStore = backupResourceStore.getMetadataStore();
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry((UnitOfWorkParams)UnitOfWorkParams.builder().readonly(true).unitName(this.project).processor(() -> {
                ResourceStore srcResourceStore = ResourceStore.getKylinMetaStore((KylinConfig)this.config);
                for (String resPath : this.getMetadataDumpList()) {
                    srcResourceStore.copy(resPath, backupResourceStore);
                }
                AuditLogStore auditLogStore = srcResourceStore.getAuditLogStore();
                long finalOffset = auditLogStore.getMaxId();
                backupResourceStore.putResourceWithoutCheck(ResourceStore.METASTORE_IMAGE, ByteSource.wrap((byte[])JsonUtil.writeValueAsBytes((Object)new ImageDesc(Long.valueOf(finalOffset)))), System.currentTimeMillis(), -1L);
                return null;
            }).build());
            backupMetadataStore.dump(backupResourceStore);
            this.distMetaStorageUrl = metaDumpUrl;
            log.debug("dump meta success.{}", (Object)metaDumpUrl);
        }
        catch (Exception e) {
            log.error("dump meta error,{}", (Object)this.jobId, (Object)e);
        }
    }

    private void generateLog4jConfiguration(boolean isExecutor, StringBuilder log4jJavaOptionsSB, String log4jXmlFile) {
        String log4jConfigStr = "file:" + log4jXmlFile;
        if (isExecutor || this.isYarnCluster || this.config.getSparkMaster().startsWith("k8s")) {
            log4jConfigStr = Paths.get(log4jXmlFile, new String[0]).getFileName().toString();
        }
        log4jJavaOptionsSB.append(StreamingJobLauncher.javaPropertyFormatter("log4j.configurationFile", log4jConfigStr));
    }

    private String wrapDriverJavaOptions(Map<String, String> sparkConf) {
        String existOptStr = sparkConf.get("spark.driver.extraJavaOptions");
        Preconditions.checkNotNull((Object)existOptStr, (Object)"spark.driver.extraJavaOptions is empty");
        StringBuilder driverJavaOptSB = new StringBuilder(existOptStr);
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        this.rewriteKrb5Conf(driverJavaOptSB, existOptStr, kapConfig.getKerberosKrb5ConfPath());
        this.rewriteKafkaJaasConf(driverJavaOptSB, existOptStr, kapConfig.getKafkaJaasConfPath());
        driverJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("kylin.spark.rest.server.ip", AddressUtil.getLocalHostExactAddress()));
        driverJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("kylin.hdfs.working.dir", this.config.getHdfsWorkingDirectory()));
        driverJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", this.getDriverHDFSLogPath()));
        driverJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("user.timezone", this.config.getTimeZone()));
        String driverLog4jXmlFile = this.config.getLogSparkStreamingDriverPropertiesFile();
        this.generateLog4jConfiguration(false, driverJavaOptSB, driverLog4jXmlFile);
        return driverJavaOptSB.toString();
    }

    private String wrapExecutorJavaOptions(Map<String, String> sparkConf) {
        String existOptionsStr = sparkConf.get("spark.executor.extraJavaOptions");
        Preconditions.checkNotNull((Object)existOptionsStr, (Object)"spark.executor.extraJavaOptions is empty");
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        StringBuilder executorJavaOptSB = new StringBuilder(existOptionsStr);
        this.rewriteKrb5Conf(executorJavaOptSB, existOptionsStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf());
        this.rewriteKafkaJaasConf(executorJavaOptSB, existOptionsStr, HADOOP_CONF_PATH + StreamingJobUtils.getExecutorJaasName());
        executorJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("kap.spark.identifier", this.jobId));
        executorJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("kap.spark.jobTimeStamp", this.currentTimestamp.toString()));
        executorJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("kap.spark.project", this.project));
        executorJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("user.timezone", this.config.getTimeZone()));
        if (StringUtils.isNotBlank((CharSequence)this.config.getMountSparkLogDir())) {
            executorJavaOptSB.append(StreamingJobLauncher.javaPropertyFormatter("job.mountDir", this.config.getMountSparkLogDir()));
        }
        String executorLog4jXmlFile = this.config.getLogSparkStreamingExecutorPropertiesFile();
        this.generateLog4jConfiguration(true, executorJavaOptSB, executorLog4jXmlFile);
        return executorJavaOptSB.toString();
    }

    private String wrapYarnAmJavaOptions(Map<String, String> sparkConf) {
        String existOptStr = sparkConf.getOrDefault("spark.yarn.am.extraJavaOptions", "");
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        StringBuilder yarnAmJavaOptSB = new StringBuilder(existOptStr);
        this.rewriteKrb5Conf(yarnAmJavaOptSB, existOptStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf());
        return yarnAmJavaOptSB.toString();
    }

    private void rewriteKafkaJaasConf(StringBuilder sb, String existOptStr, String value) {
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        if (!kapConfig.isKafkaJaasEnabled() || this.jobType != JobTypeEnum.STREAMING_BUILD || existOptStr.contains(JAASCONF_PROPS)) {
            return;
        }
        String jaasConf = StreamingJobLauncher.javaPropertyFormatter(JAASCONF_PROPS, value);
        sb.append(jaasConf);
    }

    private void rewriteKrb5Conf(StringBuilder sb, String existConfStr, String value) {
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        if (!kapConfig.isKerberosEnabled() || existConfStr.contains(KRB5CONF_PROPS)) {
            return;
        }
        String krb5Conf = StreamingJobLauncher.javaPropertyFormatter(KRB5CONF_PROPS, value);
        sb.append(krb5Conf);
    }

    private void addParserJar(SparkLauncher sparkLauncher) {
        String parserName = this.getParserName();
        if (this.jobType == JobTypeEnum.STREAMING_BUILD && !StringUtils.equals((CharSequence)"org.apache.kylin.parser.TimedJsonStreamParser", (CharSequence)parserName)) {
            DataParserInfo parserInfo = this.getDataParser(parserName);
            String jarPath = this.getParserJarPath(parserInfo);
            sparkLauncher.addJar(jarPath);
            log.info("streaming job {} use parser {} jar path {}", new Object[]{this.jobId, parserName, jarPath});
        }
    }

    public void startYarnJob() throws Exception {
        String keyTabAbsPath;
        Map<String, String> sparkConf = StreamingJobLauncher.getStreamingSparkConfig(this.config);
        sparkConf.forEach((key, value) -> this.launcher.setConf(key, value));
        SparkLauncher sparkLauncher = this.launcher.setAppName(this.jobId).setSparkHome(KylinConfig.getSparkHome());
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        if (kapConfig.isKerberosEnabled()) {
            sparkLauncher.setConf("spark.kerberos.keytab", kapConfig.getKerberosKeytabPath());
            sparkLauncher.setConf("spark.kerberos.principal", kapConfig.getKerberosPrincipal());
        }
        if (kapConfig.isKafkaJaasEnabled() && this.jobType == JobTypeEnum.STREAMING_BUILD && StringUtils.isNotEmpty((CharSequence)(keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath()))) {
            sparkLauncher.addFile(keyTabAbsPath);
        }
        this.addParserJar(sparkLauncher);
        String numberOfExecutor = sparkConf.getOrDefault("spark.executor.instances", "2");
        String numberOfCore = sparkConf.getOrDefault("spark.executor.cores", "2");
        sparkLauncher.setMaster(sparkConf.getOrDefault("spark.master", "yarn")).setConf("spark.driver.memory", sparkConf.getOrDefault("spark.driver.memory", "512m")).setConf("spark.executor.instances", numberOfExecutor).setConf("spark.executor.cores", numberOfCore).setConf("spark.cores.max", this.calcMaxCores(numberOfExecutor, numberOfCore)).setConf("spark.executor.memory", sparkConf.getOrDefault("spark.executor.memory", "1g")).setConf("spark.sql.shuffle.partitions", sparkConf.getOrDefault("spark.sql.shuffle.partitions", "8")).setConf("spark.yarn.dist.jars", this.kylinJobJar).setConf("spark.hadoop.yarn.timeline-service.enabled", "false").setConf("spark.driver.extraClassPath", this.kylinJobJar).setConf("spark.executor.extraClassPath", Paths.get(this.kylinJobJar, new String[0]).getFileName().toString()).setConf("spark.driver.extraJavaOptions", this.wrapDriverJavaOptions(sparkConf)).setConf("spark.executor.extraJavaOptions", this.wrapExecutorJavaOptions(sparkConf)).setConf("spark.yarn.am.extraJavaOptions", this.wrapYarnAmJavaOptions(sparkConf)).addJar(this.config.getKylinExtJarsPath(Boolean.valueOf(true))).addFile(this.config.getLogSparkStreamingExecutorPropertiesFile()).setAppResource(this.kylinJobJar).setMainClass(this.mainClazz).addAppArgs(this.appArgs);
        this.handler = sparkLauncher.startApplication(new SparkAppHandle.Listener[]{this.listener});
    }

    @Override
    public void launch() {
        try {
            if (this.config.isUTEnv()) {
                log.info("{} -- {} {} streaming job starts to launch", new Object[]{this.project, this.modelId, this.jobType.name()});
            } else if (StreamingUtils.isLocalMode()) {
                if (JobTypeEnum.STREAMING_BUILD == this.jobType) {
                    log.info("Starting streaming build job in local mode...");
                    StreamingEntry.main(this.appArgs);
                } else if (JobTypeEnum.STREAMING_MERGE == this.jobType) {
                    log.info("Starting streaming merge job in local mode...");
                    StreamingMergeEntry.main(this.appArgs);
                }
            } else {
                this.startYarnJob();
            }
        }
        catch (Exception e) {
            log.error("launch streaming application failed: " + e.getMessage(), (Throwable)e);
            MetaInfoUpdater.updateJobState(this.project, this.jobId, JobStatusEnum.LAUNCHING_ERROR);
            throw new KylinException((ErrorCodeSupplier)ServerErrorCode.JOB_START_FAILURE, e.getMessage());
        }
        log.info("Streaming job create success on model {}", (Object)this.modelId);
    }

    @Override
    public void stop() {
        MetaInfoUpdater.markGracefulShutdown(this.project, StreamingUtils.getJobId((String)this.modelId, (String)this.jobType.name()));
    }

    private String calcMaxCores(String executors, String cores) {
        return String.valueOf(Integer.parseInt(executors) * Integer.parseInt(cores));
    }

    @Generated
    public boolean isInitialized() {
        return this.initialized;
    }
}

