/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.ExampleDriver;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.apache.tez.runtime.library.processor.SleepProcessor;

public class OrderedWordCount {
    private static Log LOG = LogFactory.getLog(OrderedWordCount.class);
    private Credentials credentials = new Credentials();

    private DAG createDAG(FileSystem fs, Configuration conf, Map<String, LocalResource> commonLocalResources, Path stagingDir, int dagIndex, String inputPath, String outputPath, boolean generateSplitsInClient) throws Exception {
        JobConf mapStageConf = new JobConf(conf);
        mapStageConf.set("mapreduce.job.map.class", TokenizerMapper.class.getName());
        mapStageConf.set("mapreduce.map.output.key.class", Text.class.getName());
        mapStageConf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        if (generateSplitsInClient) {
            mapStageConf.set("mapreduce.job.inputformat.class", TextInputFormat.class.getName());
        } else {
            mapStageConf.set("mapreduce.job.inputformat.class", TezGroupedSplitsInputFormat.class.getName());
        }
        mapStageConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
        mapStageConf.setBoolean("mapred.mapper.new-api", true);
        InputSplitInfoDisk inputSplitInfo = null;
        if (generateSplitsInClient) {
            inputSplitInfo = MRHelpers.generateInputSplits((Configuration)mapStageConf, (Path)stagingDir);
            mapStageConf.setInt("mapreduce.job.maps", inputSplitInfo.getNumTasks());
        }
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)mapStageConf, null);
        JobConf iReduceStageConf = new JobConf(conf);
        iReduceStageConf.setInt("mapreduce.job.reduces", 2);
        iReduceStageConf.set("mapreduce.job.reduce.class", IntSumReducer.class.getName());
        iReduceStageConf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        iReduceStageConf.set("mapreduce.map.output.value.class", Text.class.getName());
        iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)iReduceStageConf, (Configuration)mapStageConf);
        JobConf finalReduceConf = new JobConf(conf);
        finalReduceConf.setInt("mapreduce.job.reduces", 1);
        finalReduceConf.set("mapreduce.job.reduce.class", MyOrderByNoOpReducer.class.getName());
        finalReduceConf.set("mapreduce.map.output.key.class", Text.class.getName());
        finalReduceConf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        finalReduceConf.set("mapreduce.job.outputformat.class", TextOutputFormat.class.getName());
        finalReduceConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath);
        finalReduceConf.setBoolean("mapred.mapper.new-api", true);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)finalReduceConf, (Configuration)iReduceStageConf);
        MRHelpers.doJobClientMagic((Configuration)mapStageConf);
        MRHelpers.doJobClientMagic((Configuration)iReduceStageConf);
        MRHelpers.doJobClientMagic((Configuration)finalReduceConf);
        ArrayList<Vertex> vertices = new ArrayList<Vertex>();
        byte[] mapPayload = MRHelpers.createUserPayloadFromConf((Configuration)mapStageConf);
        byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping((byte[])mapPayload, (String)TextInputFormat.class.getName());
        int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
        Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(mapPayload), numMaps, MRHelpers.getMapResource((Configuration)mapStageConf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts((Configuration)mapStageConf));
        if (generateSplitsInClient) {
            mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
            HashMap<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
            mapLocalResources.putAll(commonLocalResources);
            MRHelpers.updateLocalResourcesForInputSplits((FileSystem)fs, (InputSplitInfo)inputSplitInfo, mapLocalResources);
            mapVertex.setTaskLocalResources(mapLocalResources);
        } else {
            mapVertex.setTaskLocalResources(commonLocalResources);
        }
        HashMap mapEnv = new HashMap();
        MRHelpers.updateEnvironmentForMRTasks((Configuration)mapStageConf, mapEnv, (boolean)true);
        mapVertex.setTaskEnvironment(mapEnv);
        Class<MRInputAMSplitGenerator> initializerClazz = generateSplitsInClient ? null : MRInputAMSplitGenerator.class;
        MRHelpers.addMRInput((Vertex)mapVertex, (byte[])mapInputPayload, initializerClazz);
        vertices.add(mapVertex);
        Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(MRHelpers.createUserPayloadFromConf((Configuration)iReduceStageConf)), 2, MRHelpers.getReduceResource((Configuration)iReduceStageConf));
        ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts((Configuration)iReduceStageConf));
        ivertex.setTaskLocalResources(commonLocalResources);
        HashMap ireduceEnv = new HashMap();
        MRHelpers.updateEnvironmentForMRTasks((Configuration)iReduceStageConf, ireduceEnv, (boolean)false);
        ivertex.setTaskEnvironment(ireduceEnv);
        vertices.add(ivertex);
        byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf((Configuration)finalReduceConf);
        Vertex finalReduceVertex = new Vertex("finalreduce", new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(finalReducePayload), 1, MRHelpers.getReduceResource((Configuration)finalReduceConf));
        finalReduceVertex.setJavaOpts(MRHelpers.getReduceJavaOpts((Configuration)finalReduceConf));
        finalReduceVertex.setTaskLocalResources(commonLocalResources);
        HashMap reduceEnv = new HashMap();
        MRHelpers.updateEnvironmentForMRTasks((Configuration)finalReduceConf, reduceEnv, (boolean)false);
        finalReduceVertex.setTaskEnvironment(reduceEnv);
        MRHelpers.addMROutputLegacy((Vertex)finalReduceVertex, (byte[])finalReducePayload);
        vertices.add(finalReduceVertex);
        DAG dag = new DAG("OrderedWordCount" + dagIndex);
        for (int i = 0; i < vertices.size(); ++i) {
            dag.addVertex((Vertex)vertices.get(i));
            if (i == 0) continue;
            dag.addEdge(new Edge((Vertex)vertices.get(i - 1), (Vertex)vertices.get(i), new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
        }
        return dag;
    }

    private static void printUsage() {
        String options = " [-generateSplitsInClient true/<false>]";
        System.err.println("Usage: orderedwordcount <in> <out>" + options);
        System.err.println("Usage (In Session Mode): orderedwordcount <in1> <out1> ... <inN> <outN>" + options);
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        boolean generateSplitsInClient = false;
        SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
        try {
            generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
            otherArgs = splitCmdLineParser.getRemainingArgs();
        }
        catch (ParseException e1) {
            System.err.println("Invalid options");
            OrderedWordCount.printUsage();
            System.exit(2);
        }
        boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
        long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0) * 1000;
        boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
        if (otherArgs.length % 2 != 0 || !useTezSession && otherArgs.length != 2) {
            OrderedWordCount.printUsage();
            System.exit(2);
        }
        ArrayList<String> inputPaths = new ArrayList<String>();
        ArrayList<String> outputPaths = new ArrayList<String>();
        for (int i = 0; i < otherArgs.length; i += 2) {
            inputPaths.add(otherArgs[i]);
            outputPaths.add(otherArgs[i + 1]);
        }
        UserGroupInformation.setConfiguration((Configuration)conf);
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        TezConfiguration tezConf = new TezConfiguration(conf);
        TezClient tezClient = new TezClient(tezConf);
        ApplicationId appId = tezClient.createApplication();
        OrderedWordCount instance = new OrderedWordCount();
        FileSystem fs = FileSystem.get((Configuration)conf);
        String stagingDirStr = "/user/" + user + "/" + ".staging" + "/" + "/" + appId.toString();
        Path stagingDir = new Path(stagingDirStr);
        FileSystem pathFs = stagingDir.getFileSystem((Configuration)tezConf);
        pathFs.mkdirs(new Path(stagingDirStr));
        tezConf.set("tez.staging-dir", stagingDirStr);
        stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
        TokenCache.obtainTokensForNamenodes((Credentials)instance.credentials, (Path[])new Path[]{stagingDir}, (Configuration)conf);
        TezClientUtils.ensureStagingDirExists((Configuration)tezConf, (Path)stagingDir);
        tezConf.set("tez.am.java.opts", MRHelpers.getMRAMJavaOpts((Configuration)conf));
        TezSession tezSession = null;
        AMConfiguration amConfig = new AMConfiguration(null, null, tezConf, instance.credentials);
        if (useTezSession) {
            LOG.info((Object)"Creating Tez Session");
            TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
            tezSession = new TezSession("OrderedWordCountSession", appId, sessionConfig);
            tezSession.start();
            LOG.info((Object)"Created Tez Session");
        }
        DAGStatus dagStatus = null;
        DAGClient dagClient = null;
        String[] vNames = new String[]{"initialmap", "intermediate_reducer", "finalreduce"};
        EnumSet<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
        try {
            for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
                if (dagIndex != 1 && interJobSleepTimeout > 0L) {
                    try {
                        LOG.info((Object)("Sleeping between jobs, sleepInterval=" + interJobSleepTimeout / 1000L));
                        Thread.sleep(interJobSleepTimeout);
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"Main thread interrupted. Breaking out of job loop");
                        break;
                    }
                }
                String inputPath = (String)inputPaths.get(dagIndex - 1);
                String outputPath = (String)outputPaths.get(dagIndex - 1);
                if (fs.exists(new Path(outputPath))) {
                    throw new FileAlreadyExistsException("Output directory " + outputPath + " already exists");
                }
                LOG.info((Object)("Running OrderedWordCount DAG, dagIndex=" + dagIndex + ", inputPath=" + inputPath + ", outputPath=" + outputPath));
                TreeMap<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
                DAG dag = instance.createDAG(fs, conf, localResources, stagingDir, dagIndex, inputPath, outputPath, generateSplitsInClient);
                boolean doPreWarm = dagIndex == 1 && useTezSession && conf.getBoolean("PRE_WARM_SESSION", true);
                int preWarmNumContainers = 0;
                if (doPreWarm && (preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0)) <= 0) {
                    doPreWarm = false;
                }
                if (doPreWarm) {
                    LOG.info((Object)"Pre-warming Session");
                    VertexLocationHint vertexLocationHint = new VertexLocationHint(null);
                    ProcessorDescriptor sleepProcDescriptor = new ProcessorDescriptor(SleepProcessor.class.getName());
                    SleepProcessor.SleepProcessorConfig sleepProcessorConfig = new SleepProcessor.SleepProcessorConfig(4000);
                    sleepProcDescriptor.setUserPayload(sleepProcessorConfig.toUserPayload());
                    PreWarmContext context = new PreWarmContext(sleepProcDescriptor, dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers, vertexLocationHint);
                    TreeMap contextLocalRsrcs = new TreeMap();
                    contextLocalRsrcs.putAll(dag.getVertex("initialmap").getTaskLocalResources());
                    TreeMap contextEnv = new TreeMap();
                    contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
                    String contextJavaOpts = dag.getVertex("initialmap").getJavaOpts();
                    context.setLocalResources(contextLocalRsrcs).setJavaOpts(contextJavaOpts).setEnvironment(contextEnv);
                    tezSession.preWarm(context);
                }
                if (useTezSession) {
                    LOG.info((Object)"Waiting for TezSession to get into ready state");
                    OrderedWordCount.waitForTezSessionReady(tezSession);
                    LOG.info((Object)("Submitting DAG to Tez Session, dagIndex=" + dagIndex));
                    dagClient = tezSession.submitDAG(dag);
                    LOG.info((Object)("Submitted DAG to Tez Session, dagIndex=" + dagIndex));
                } else {
                    LOG.info((Object)"Submitting DAG as a new Tez Application");
                    dagClient = tezClient.submitDAGApplication(dag, amConfig);
                }
                while ((dagStatus = dagClient.getDAGStatus(statusGetOpts)).getState() != DAGStatus.State.RUNNING && dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {}
                }
                while (dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
                        ExampleDriver.printDAGStatus(dagClient, vNames);
                    }
                    try {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException e) {
                            // empty catch block
                        }
                        dagStatus = dagClient.getDAGStatus(statusGetOpts);
                    }
                    catch (TezException e) {
                        LOG.fatal((Object)"Failed to get application progress. Exiting");
                        System.exit(-1);
                    }
                }
                ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
                LOG.info((Object)("DAG " + dagIndex + " completed. " + "FinalState=" + dagStatus.getState()));
                if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) continue;
                LOG.info((Object)("DAG " + dagIndex + " diagnostics: " + dagStatus.getDiagnostics()));
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Error occurred when submitting/running DAGs", (Throwable)e);
            throw e;
        }
        finally {
            if (!retainStagingDir) {
                pathFs.delete(stagingDir, true);
            }
            if (useTezSession) {
                LOG.info((Object)"Shutting down session");
                tezSession.stop();
            }
        }
        if (!useTezSession) {
            ExampleDriver.printDAGStatus(dagClient, vNames);
            LOG.info((Object)("Application completed. FinalState=" + dagStatus.getState()));
            System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
        }
    }

    private static void waitForTezSessionReady(TezSession tezSession) throws IOException, TezException {
        while (true) {
            TezSessionStatus status;
            if ((status = tezSession.getSessionStatus()).equals((Object)TezSessionStatus.SHUTDOWN)) {
                throw new RuntimeException("TezSession has already shutdown");
            }
            if (status.equals((Object)TezSessionStatus.READY)) {
                return;
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                LOG.info((Object)"Interrupted while trying to check session status");
                return;
            }
        }
    }

    public static class MyOrderByNoOpReducer
    extends Reducer<IntWritable, Text, Text, IntWritable> {
        public void reduce(IntWritable key, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
            for (Text word : values) {
                context.write((Object)word, (Object)key);
            }
        }
    }

    public static class IntSumReducer
    extends Reducer<Text, IntWritable, IntWritable, Text> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write((Object)this.result, (Object)key);
        }
    }

    public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)one);
            }
        }
    }
}

