/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.examples.ExampleDriver;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;

public class TestOrderedWordCount
extends Configured
implements Tool {
    private static Log LOG = LogFactory.getLog(TestOrderedWordCount.class);
    private Credentials credentials = new Credentials();

    @VisibleForTesting
    public DAG createDAG(FileSystem fs, Configuration conf, Map<String, LocalResource> commonLocalResources, Path stagingDir, int dagIndex, String inputPath, String outputPath, boolean generateSplitsInClient) throws Exception {
        DataSourceDescriptor dsd;
        JobConf mapStageConf = new JobConf(conf);
        mapStageConf.set("mapreduce.job.map.class", TokenizerMapper.class.getName());
        MRHelpers.translateMRConfToTez((Configuration)mapStageConf);
        JobConf iReduceStageConf = new JobConf(conf);
        iReduceStageConf.setInt("mapreduce.job.reduces", 2);
        iReduceStageConf.set("mapreduce.job.reduce.class", IntSumReducer.class.getName());
        iReduceStageConf.set("tez.runtime.key.class", Text.class.getName());
        iReduceStageConf.set("tez.runtime.value.class", IntWritable.class.getName());
        iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
        MRHelpers.translateMRConfToTez((Configuration)iReduceStageConf);
        JobConf finalReduceConf = new JobConf(conf);
        finalReduceConf.setInt("mapreduce.job.reduces", 1);
        finalReduceConf.set("mapreduce.job.reduce.class", MyOrderByNoOpReducer.class.getName());
        finalReduceConf.set("tez.runtime.key.class", IntWritable.class.getName());
        finalReduceConf.set("tez.runtime.value.class", Text.class.getName());
        MRHelpers.translateMRConfToTez((Configuration)finalReduceConf);
        MRHelpers.configureMRApiUsage((Configuration)mapStageConf);
        MRHelpers.configureMRApiUsage((Configuration)iReduceStageConf);
        MRHelpers.configureMRApiUsage((Configuration)finalReduceConf);
        ArrayList<Vertex> vertices = new ArrayList<Vertex>();
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
        mapStageConf.writeXml((OutputStream)outputStream);
        String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
        if (generateSplitsInClient) {
            mapStageConf.set("mapreduce.job.inputformat.class", TextInputFormat.class.getName());
            mapStageConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
            mapStageConf.setBoolean("mapred.mapper.new-api", true);
            dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)mapStageConf, (Path)stagingDir, (boolean)true);
        } else {
            dsd = MRInputLegacy.createConfigBuilder((Configuration)mapStageConf, TextInputFormat.class, (String)inputPath).build();
        }
        Vertex mapVertex = Vertex.create((String)"initialmap", (ProcessorDescriptor)((ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)mapStageConf))).setHistoryText(mapStageHistoryText))).addTaskLocalFiles(commonLocalResources);
        mapVertex.addDataSource("MRInput", dsd);
        vertices.add(mapVertex);
        ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
        iReduceStageConf.writeXml((OutputStream)iROutputStream);
        String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
        Vertex ivertex = Vertex.create((String)"intermediate_reducer", (ProcessorDescriptor)((ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)iReduceStageConf))).setHistoryText(iReduceStageHistoryText)), (int)2);
        ivertex.addTaskLocalFiles(commonLocalResources);
        vertices.add(ivertex);
        ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
        finalReduceConf.writeXml((OutputStream)finalReduceOutputStream);
        String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
        UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf((Configuration)finalReduceConf);
        Vertex finalReduceVertex = Vertex.create((String)"finalreduce", (ProcessorDescriptor)((ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(finalReducePayload)).setHistoryText(finalReduceStageHistoryText)), (int)1);
        finalReduceVertex.addTaskLocalFiles(commonLocalResources);
        finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder((Configuration)finalReduceConf, TextOutputFormat.class, (String)outputPath).build());
        vertices.add(finalReduceVertex);
        DAG dag = DAG.create((String)("OrderedWordCount" + dagIndex));
        for (int i = 0; i < vertices.size(); ++i) {
            dag.addVertex((Vertex)vertices.get(i));
        }
        OrderedPartitionedKVEdgeConfig edgeConf1 = ((OrderedPartitionedKVEdgeConfig.Builder)OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)IntWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration(conf).configureInput().useLegacyInput().done()).build();
        dag.addEdge(Edge.create((Vertex)dag.getVertex("initialmap"), (Vertex)dag.getVertex("intermediate_reducer"), (EdgeProperty)edgeConf1.createDefaultEdgeProperty()));
        OrderedPartitionedKVEdgeConfig edgeConf2 = ((OrderedPartitionedKVEdgeConfig.Builder)OrderedPartitionedKVEdgeConfig.newBuilder((String)IntWritable.class.getName(), (String)Text.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration(conf).configureInput().useLegacyInput().done()).build();
        dag.addEdge(Edge.create((Vertex)dag.getVertex("intermediate_reducer"), (Vertex)dag.getVertex("finalreduce"), (EdgeProperty)edgeConf2.createDefaultEdgeProperty()));
        return dag;
    }

    private static void printUsage() {
        String options = " [-generateSplitsInClient true/<false>]";
        System.err.println("Usage: testorderedwordcount <in> <out>" + options);
        System.err.println("Usage (In Session Mode): testorderedwordcount <in1> <out1> ... <inN> <outN>" + options);
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    public int run(String[] args) throws Exception {
        boolean generateSplitsInClient;
        Configuration conf = this.getConf();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
        try {
            generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
            otherArgs = splitCmdLineParser.getRemainingArgs();
        }
        catch (ParseException e1) {
            System.err.println("Invalid options");
            TestOrderedWordCount.printUsage();
            return 2;
        }
        boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
        long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0) * 1000;
        boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
        if (otherArgs.length % 2 != 0 || !useTezSession && otherArgs.length != 2) {
            TestOrderedWordCount.printUsage();
            return 2;
        }
        ArrayList<String> inputPaths = new ArrayList<String>();
        ArrayList<String> outputPaths = new ArrayList<String>();
        for (int i = 0; i < otherArgs.length; i += 2) {
            inputPaths.add(otherArgs[i]);
            outputPaths.add(otherArgs[i + 1]);
        }
        UserGroupInformation.setConfiguration((Configuration)conf);
        TezConfiguration tezConf = new TezConfiguration(conf);
        TestOrderedWordCount instance = new TestOrderedWordCount();
        FileSystem fs = FileSystem.get((Configuration)conf);
        String stagingDirStr = conf.get("tez.staging-dir", "/tmp/tez/staging") + "/" + Long.toString(System.currentTimeMillis());
        Path stagingDir = new Path(stagingDirStr);
        FileSystem pathFs = stagingDir.getFileSystem((Configuration)tezConf);
        pathFs.mkdirs(new Path(stagingDirStr));
        tezConf.set("tez.staging-dir", stagingDirStr);
        stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
        TokenCache.obtainTokensForNamenodes((Credentials)instance.credentials, (Path[])new Path[]{stagingDir}, (Configuration)conf);
        TezClientUtils.ensureStagingDirExists((Configuration)tezConf, (Path)stagingDir);
        if (useTezSession) {
            LOG.info((Object)"Creating Tez Session");
            tezConf.setBoolean("tez.am.mode.session", true);
        } else {
            tezConf.setBoolean("tez.am.mode.session", false);
        }
        TezClient tezSession = TezClient.create((String)"OrderedWordCountSession", (TezConfiguration)tezConf, null, (Credentials)instance.credentials);
        tezSession.start();
        DAGStatus dagStatus = null;
        DAGClient dagClient = null;
        String[] vNames = new String[]{"initialmap", "intermediate_reducer", "finalreduce"};
        EnumSet<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
        try {
            for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
                if (dagIndex != 1 && interJobSleepTimeout > 0L) {
                    try {
                        LOG.info((Object)("Sleeping between jobs, sleepInterval=" + interJobSleepTimeout / 1000L));
                        Thread.sleep(interJobSleepTimeout);
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"Main thread interrupted. Breaking out of job loop");
                        break;
                    }
                }
                String inputPath = (String)inputPaths.get(dagIndex - 1);
                String outputPath = (String)outputPaths.get(dagIndex - 1);
                if (fs.exists(new Path(outputPath))) {
                    throw new FileAlreadyExistsException("Output directory " + outputPath + " already exists");
                }
                LOG.info((Object)("Running OrderedWordCount DAG, dagIndex=" + dagIndex + ", inputPath=" + inputPath + ", outputPath=" + outputPath));
                TreeMap<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
                DAG dag = instance.createDAG(fs, conf, localResources, stagingDir, dagIndex, inputPath, outputPath, generateSplitsInClient);
                boolean doPreWarm = dagIndex == 1 && useTezSession && conf.getBoolean("PRE_WARM_SESSION", true);
                int preWarmNumContainers = 0;
                if (doPreWarm && (preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0)) <= 0) {
                    doPreWarm = false;
                }
                if (doPreWarm) {
                    LOG.info((Object)"Pre-warming Session");
                    PreWarmVertex preWarmVertex = PreWarmVertex.create((String)"PreWarm", (int)preWarmNumContainers, (Resource)dag.getVertex("initialmap").getTaskResource());
                    preWarmVertex.addTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
                    preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
                    preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
                    tezSession.preWarm(preWarmVertex);
                }
                if (useTezSession) {
                    LOG.info((Object)"Waiting for TezSession to get into ready state");
                    TestOrderedWordCount.waitForTezSessionReady(tezSession);
                    LOG.info((Object)("Submitting DAG to Tez Session, dagIndex=" + dagIndex));
                    dagClient = tezSession.submitDAG(dag);
                    LOG.info((Object)("Submitted DAG to Tez Session, dagIndex=" + dagIndex));
                } else {
                    LOG.info((Object)"Submitting DAG as a new Tez Application");
                    dagClient = tezSession.submitDAG(dag);
                }
                while ((dagStatus = dagClient.getDAGStatus(statusGetOpts)).getState() != DAGStatus.State.RUNNING && dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {}
                }
                while (dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
                        ExampleDriver.printDAGStatus(dagClient, vNames);
                    }
                    try {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException e) {
                            // empty catch block
                        }
                        dagStatus = dagClient.getDAGStatus(statusGetOpts);
                    }
                    catch (TezException e) {
                        LOG.fatal((Object)"Failed to get application progress. Exiting");
                        int n = -1;
                        if (!retainStagingDir) {
                            pathFs.delete(stagingDir, true);
                        }
                        LOG.info((Object)"Shutting down session");
                        tezSession.stop();
                        return n;
                    }
                }
                ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
                LOG.info((Object)("DAG " + dagIndex + " completed. " + "FinalState=" + dagStatus.getState()));
                if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) continue;
                LOG.info((Object)("DAG " + dagIndex + " diagnostics: " + dagStatus.getDiagnostics()));
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Error occurred when submitting/running DAGs", (Throwable)e);
            throw e;
        }
        finally {
            if (!retainStagingDir) {
                pathFs.delete(stagingDir, true);
            }
            LOG.info((Object)"Shutting down session");
            tezSession.stop();
        }
        if (!useTezSession) {
            ExampleDriver.printDAGStatus(dagClient, vNames);
            LOG.info((Object)("Application completed. FinalState=" + dagStatus.getState()));
        }
        return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
    }

    private static void waitForTezSessionReady(TezClient tezSession) throws IOException, TezException, InterruptedException {
        tezSession.waitTillReady();
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)new Configuration(), (Tool)new TestOrderedWordCount(), (String[])args);
        System.exit(res);
    }

    public static class MyOrderByNoOpReducer
    extends Reducer<IntWritable, Text, Text, IntWritable> {
        public void reduce(IntWritable key, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
            for (Text word : values) {
                context.write((Object)word, (Object)key);
            }
        }
    }

    public static class IntSumReducer
    extends Reducer<Text, IntWritable, IntWritable, Text> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write((Object)this.result, (Object)key);
        }
    }

    public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)one);
            }
        }
    }
}

