/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.LocalHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveSparkClientFactory {
    protected static final transient Logger LOG = LoggerFactory.getLogger(HiveSparkClientFactory.class);
    private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
    private static final String SPARK_DEFAULT_MASTER = "yarn";
    private static final String SPARK_DEFAULT_DEPLOY_MODE = "cluster";
    private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
    private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
    private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
    private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
    private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";

    public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
        Map<String, String> sparkConf = HiveSparkClientFactory.initiateSparkConf(hiveconf);
        String master = sparkConf.get("spark.master");
        if (master.equals("local") || master.startsWith("local[")) {
            return LocalHiveSparkClient.getInstance(HiveSparkClientFactory.generateSparkConf(sparkConf));
        }
        return new RemoteHiveSparkClient(hiveconf, sparkConf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
        String password;
        String queueName;
        HashMap<String, String> sparkConf = new HashMap<String, String>();
        HBaseConfiguration.addHbaseResources((Configuration)hiveConf);
        sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
        String appNameKey = "spark.app.name";
        String appName = hiveConf.get("spark.app.name");
        if (appName == null) {
            appName = SPARK_DEFAULT_APP_NAME;
        }
        sparkConf.put("spark.app.name", appName);
        sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
        sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
        InputStream inputStream = null;
        try {
            inputStream = HiveSparkClientFactory.class.getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
            if (inputStream != null) {
                LOG.info("loading spark properties from:spark-defaults.conf");
                Properties properties = new Properties();
                properties.load(new InputStreamReader(inputStream, "UTF-8"));
                for (String string : properties.stringPropertyNames()) {
                    if (!string.startsWith("spark")) continue;
                    String value = properties.getProperty(string);
                    sparkConf.put(string, properties.getProperty(string));
                    LOG.info(String.format("load spark property from %s (%s -> %s).", SPARK_DEFAULT_CONF_FILE, string, LogUtils.maskIfPassword(string, value)));
                }
            }
        }
        catch (IOException e) {
            LOG.info("Failed to open spark configuration file:spark-defaults.conf", e);
        }
        finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                }
                catch (IOException e) {
                    LOG.debug("Failed to close inputstream.", e);
                }
            }
        }
        String sparkMaster = hiveConf.get("spark.master");
        if (sparkMaster == null) {
            sparkMaster = (String)sparkConf.get("spark.master");
            hiveConf.set("spark.master", sparkMaster);
        }
        String deployMode = null;
        if (!SparkClientUtilities.isLocalMaster(sparkMaster) && (deployMode = hiveConf.get(SPARK_DEPLOY_MODE)) == null) {
            deployMode = (String)sparkConf.get(SPARK_DEPLOY_MODE);
            if (deployMode == null) {
                deployMode = SparkClientUtilities.getDeployModeFromMaster(sparkMaster);
            }
            if (deployMode == null) {
                deployMode = SPARK_DEFAULT_DEPLOY_MODE;
            }
            hiveConf.set(SPARK_DEPLOY_MODE, deployMode);
        }
        if (SessionState.get() != null && SessionState.get().getConf() != null) {
            SessionState.get().getConf().set("spark.master", sparkMaster);
            if (deployMode != null) {
                SessionState.get().getConf().set(SPARK_DEPLOY_MODE, deployMode);
            }
        }
        if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode)) {
            sparkConf.put("spark.yarn.maxAppAttempts", "1");
        }
        Iterator iterator = hiveConf.iterator();
        while (iterator.hasNext()) {
            String value;
            Map.Entry entry = (Map.Entry)iterator.next();
            String propertyName3 = (String)entry.getKey();
            if (propertyName3.startsWith("spark")) {
                value = hiveConf.get(propertyName3);
                sparkConf.put(propertyName3, value);
                LOG.info(String.format("load spark property from hive configuration (%s -> %s).", propertyName3, LogUtils.maskIfPassword(propertyName3, value)));
            } else if (propertyName3.startsWith(SPARK_DEFAULT_MASTER) && SparkClientUtilities.isYarnMaster(sparkMaster)) {
                value = hiveConf.get(propertyName3);
                sparkConf.put("spark.hadoop." + propertyName3, value);
                LOG.info(String.format("load yarn property from hive configuration in %s mode (%s -> %s).", sparkMaster, propertyName3, LogUtils.maskIfPassword(propertyName3, value)));
            } else if (propertyName3.equals("fs.defaultFS")) {
                value = hiveConf.get(propertyName3);
                if (value != null && !value.isEmpty()) {
                    sparkConf.put("spark.hadoop." + propertyName3, value);
                }
            } else if (propertyName3.startsWith("hbase") || propertyName3.startsWith("zookeeper.znode")) {
                value = hiveConf.get(propertyName3);
                sparkConf.put("spark.hadoop." + propertyName3, value);
                LOG.info(String.format("load HBase configuration (%s -> %s).", propertyName3, LogUtils.maskIfPassword(propertyName3, value)));
            } else if (propertyName3.startsWith("oozie")) {
                value = hiveConf.get(propertyName3);
                sparkConf.put("spark." + propertyName3, value);
                LOG.info(String.format("Pass Oozie configuration (%s -> %s).", propertyName3, LogUtils.maskIfPassword(propertyName3, value)));
            }
            if (!RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName3)) continue;
            value = RpcConfiguration.getValue(hiveConf, propertyName3);
            sparkConf.put(propertyName3, value);
            LOG.info(String.format("load RPC property from hive configuration (%s -> %s).", propertyName3, LogUtils.maskIfPassword(propertyName3, value)));
        }
        HashSet<String> hashSet = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(Strings.nullToEmpty((String)sparkConf.get("spark.kryo.classesToRegister"))));
        hashSet.add(Writable.class.getName());
        hashSet.add(VectorizedRowBatch.class.getName());
        hashSet.add(BytesWritable.class.getName());
        hashSet.add(HiveKey.class.getName());
        sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(hashSet));
        String sparkQueueNameKey = "spark.yarn.queue";
        if (SparkClientUtilities.isYarnMaster(sparkMaster) && hiveConf.get("spark.yarn.queue") == null && (queueName = hiveConf.get("mapreduce.job.queuename")) != null) {
            sparkConf.put("spark.yarn.queue", queueName);
        }
        if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode) && sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
            sparkConf.put(SPARK_WAIT_APP_COMPLETE, SPARK_DEFAULT_REFERENCE_TRACKING);
        }
        if ((password = HiveConfUtil.getJobCredentialProviderPassword(hiveConf)) != null) {
            HiveSparkClientFactory.addCredentialProviderPassword(sparkConf, password);
        }
        return sparkConf;
    }

    private static void addCredentialProviderPassword(Map<String, String> sparkConf, String jobCredstorePassword) {
        sparkConf.put("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword);
        sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword);
    }

    static SparkConf generateSparkConf(Map<String, String> conf) {
        SparkConf sparkConf = new SparkConf(false);
        for (Map.Entry<String, String> entry : conf.entrySet()) {
            sparkConf.set(entry.getKey(), entry.getValue());
        }
        return sparkConf;
    }
}

