/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.HiveVoidFunction;
import org.apache.hadoop.hive.ql.exec.spark.KryoSerializer;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlan;
import org.apache.hadoop.hive.ql.exec.spark.SparkPlanGenerator;
import org.apache.hadoop.hive.ql.exec.spark.SparkReporter;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.spark.client.Job;
import org.apache.hive.spark.client.JobContext;
import org.apache.hive.spark.client.JobHandle;
import org.apache.hive.spark.client.SparkClient;
import org.apache.hive.spark.client.SparkClientFactory;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;

public class RemoteHiveSparkClient
implements HiveSparkClient {
    private static final long serialVersionUID = 1L;
    private static final String MR_JAR_PROPERTY = "tmpjars";
    protected static final transient Log LOG = LogFactory.getLog(RemoteHiveSparkClient.class);
    private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
    private transient SparkClient remoteClient;
    private transient SparkConf sparkConf;
    private transient HiveConf hiveConf;
    private transient List<URI> localJars = new ArrayList<URI>();
    private transient List<URI> localFiles = new ArrayList<URI>();
    private final transient long sparkClientTimtout;

    RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
        this.hiveConf = hiveConf;
        this.sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
        this.remoteClient = SparkClientFactory.createClient(conf, hiveConf);
        this.sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
    }

    @Override
    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    @Override
    public int getExecutorCount() throws Exception {
        Future<Integer> handler = this.remoteClient.getExecutorCount();
        return handler.get(this.sparkClientTimtout, TimeUnit.SECONDS);
    }

    @Override
    public int getDefaultParallelism() throws Exception {
        Future<Integer> handler = this.remoteClient.getDefaultParallelism();
        return handler.get(this.sparkClientTimtout, TimeUnit.SECONDS);
    }

    @Override
    public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception {
        Context ctx = driverContext.getCtx();
        HiveConf hiveConf = (HiveConf)ctx.getConf();
        this.refreshLocalResources(sparkWork, hiveConf);
        JobConf jobConf = new JobConf((Configuration)hiveConf);
        Path emptyScratchDir = ctx.getMRTmpPath();
        FileSystem fs = emptyScratchDir.getFileSystem((Configuration)jobConf);
        fs.mkdirs(emptyScratchDir);
        byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
        byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
        byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
        JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes);
        JobHandle<Serializable> jobHandle = this.remoteClient.submit(job);
        RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(this.remoteClient, jobHandle, this.sparkClientTimtout);
        return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);
    }

    private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException {
        this.addJars(new JobConf(this.getClass()).getJar());
        this.addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
        String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
        HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
        this.addJars(addedJars);
        JobConf jobConf = new JobConf((Configuration)conf);
        jobConf.set(MR_JAR_PROPERTY, "");
        for (BaseWork work : sparkWork.getAllWork()) {
            work.configureJobConf(jobConf);
        }
        this.addJars(conf.get(MR_JAR_PROPERTY));
        String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
        HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
        this.addResources(addedFiles);
        String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
        HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
        this.addResources(addedArchives);
    }

    private void addResources(String addedFiles) throws IOException {
        for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
            try {
                URI fileUri = SparkUtilities.getURI(addedFile);
                if (fileUri == null || this.localFiles.contains(fileUri)) continue;
                fileUri = SparkUtilities.uploadToHDFS(fileUri, this.hiveConf);
                this.localFiles.add(fileUri);
                this.remoteClient.addFile(fileUri);
            }
            catch (URISyntaxException e) {
                LOG.warn((Object)("Failed to add file:" + addedFile), (Throwable)e);
            }
        }
    }

    private void addJars(String addedJars) throws IOException {
        for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
            try {
                URI jarUri = SparkUtilities.getURI(addedJar);
                if (jarUri == null || this.localJars.contains(jarUri)) continue;
                jarUri = SparkUtilities.uploadToHDFS(jarUri, this.hiveConf);
                this.localJars.add(jarUri);
                this.remoteClient.addJar(jarUri);
            }
            catch (URISyntaxException e) {
                LOG.warn((Object)("Failed to add jar:" + addedJar), (Throwable)e);
            }
        }
    }

    @Override
    public void close() {
        this.remoteClient.stop();
    }

    private static class JobStatusJob
    implements Job<Serializable> {
        private final byte[] jobConfBytes;
        private final byte[] scratchDirBytes;
        private final byte[] sparkWorkBytes;

        private JobStatusJob() {
            this(null, null, null);
        }

        JobStatusJob(byte[] jobConfBytes, byte[] scratchDirBytes, byte[] sparkWorkBytes) {
            this.jobConfBytes = jobConfBytes;
            this.scratchDirBytes = scratchDirBytes;
            this.sparkWorkBytes = sparkWorkBytes;
        }

        @Override
        public Serializable call(JobContext jc) throws Exception {
            JobConf localJobConf = KryoSerializer.deserializeJobConf(this.jobConfBytes);
            List<String> addedJars = jc.getAddedJars();
            if (addedJars != null && !addedJars.isEmpty()) {
                SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]));
                localJobConf.set("hive.added.jars", StringUtils.join(addedJars, ";"));
            }
            Path localScratchDir = KryoSerializer.deserialize(this.scratchDirBytes, Path.class);
            SparkWork localSparkWork = KryoSerializer.deserialize(this.sparkWorkBytes, SparkWork.class);
            SparkCounters sparkCounters = new SparkCounters(jc.sc());
            Map<String, List<String>> prefixes = localSparkWork.getRequiredCounterPrefix();
            if (prefixes != null) {
                for (String group : prefixes.keySet()) {
                    for (String counterName : prefixes.get(group)) {
                        sparkCounters.createCounter(group, counterName);
                    }
                }
            }
            SparkReporter sparkReporter = new SparkReporter(sparkCounters);
            SparkPlanGenerator gen = new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter);
            SparkPlan plan = gen.generate(localSparkWork);
            JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
            JavaFutureAction future = finalRDD.foreachAsync((VoidFunction)HiveVoidFunction.getInstance());
            jc.monitor(future, sparkCounters, plan.getCachedRDDIds());
            return null;
        }
    }
}

