/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.examples;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;

public class HashJoinExample
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(HashJoinExample.class);
    private static final String broadcastOption = "doBroadcast";
    private static final String streamingSide = "streamingSide";
    private static final String hashSide = "hashSide";
    private static final String inputFile = "inputFile";
    private static final String joiner = "joiner";
    private static final String joinOutput = "joinOutput";

    public static void main(String[] args) throws Exception {
        HashJoinExample job = new HashJoinExample();
        int status = ToolRunner.run((Configuration)new Configuration(), (Tool)job, (String[])args);
        System.exit(status);
    }

    private static void printUsage() {
        System.err.println("Usage: hashjoin <file1> <file2> <numPartitions> <outPath> [doBroadcast(default false)]");
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int result = this.validateArgs(otherArgs);
        if (result != 0) {
            return result;
        }
        return this.execute(otherArgs);
    }

    public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
        this.setConf(conf);
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int result = this.validateArgs(otherArgs);
        if (result != 0) {
            return result;
        }
        return this.execute(otherArgs, tezClient);
    }

    private int validateArgs(String[] otherArgs) {
        if (otherArgs.length != 4 && otherArgs.length != 5) {
            HashJoinExample.printUsage();
            return 2;
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int execute(String[] args) throws TezException, IOException, InterruptedException {
        TezConfiguration tezConf = new TezConfiguration(this.getConf());
        TezClient tezClient = null;
        try {
            tezClient = this.createTezClient(tezConf);
            int n = this.execute(args, tezConf, tezClient);
            return n;
        }
        finally {
            if (tezClient != null) {
                tezClient.stop();
            }
        }
    }

    private int execute(String[] args, TezClient tezClient) throws IOException, TezException, InterruptedException {
        TezConfiguration tezConf = new TezConfiguration(this.getConf());
        return this.execute(args, tezConf, tezClient);
    }

    private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
        TezClient tezClient = TezClient.create((String)"HashJoinExample", (TezConfiguration)tezConf);
        tezClient.start();
        return tezClient;
    }

    private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient) throws IOException, TezException, InterruptedException {
        boolean doBroadcast = args.length == 5 && args[4].equals(broadcastOption);
        LOG.info((Object)("Running HashJoinExample" + (doBroadcast ? "-WithBroadcast" : "")));
        UserGroupInformation.setConfiguration((Configuration)tezConf);
        String streamInputDir = args[0];
        String hashInputDir = args[1];
        int numPartitions = Integer.parseInt(args[2]);
        String outputDir = args[3];
        Path streamInputPath = new Path(streamInputDir);
        Path hashInputPath = new Path(hashInputDir);
        Path outputPath = new Path(outputDir);
        FileSystem fs = FileSystem.get((Configuration)tezConf);
        if (fs.exists(outputPath)) {
            System.err.println("Output directory: " + outputDir + " already exists");
            return 3;
        }
        if (numPartitions <= 0) {
            System.err.println("NumPartitions must be > 0");
            return 4;
        }
        DAG dag = this.createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions, doBroadcast);
        tezClient.waitTillReady();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
            LOG.info((Object)("DAG diagnostics: " + dagStatus.getDiagnostics()));
            return -1;
        }
        return 0;
    }

    private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath, int numPartitions, boolean doBroadcast) throws IOException {
        DAG dag = DAG.create((String)("HashJoinExample" + (doBroadcast ? "-WithBroadcast" : "")));
        Vertex hashFileVertex = Vertex.create((String)hashSide, (ProcessorDescriptor)ProcessorDescriptor.create((String)ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)hashPath.toUri().toString()).groupSplits(false).build());
        Vertex streamFileVertex = Vertex.create((String)streamingSide, (ProcessorDescriptor)ProcessorDescriptor.create((String)ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)streamPath.toUri().toString()).groupSplits(false).build());
        Vertex joinVertex = Vertex.create((String)joiner, (ProcessorDescriptor)ProcessorDescriptor.create((String)HashJoinProcessor.class.getName()), (int)numPartitions).addDataSink(joinOutput, MROutput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextOutputFormat.class, (String)outPath.toUri().toString()).build());
        UnorderedPartitionedKVEdgeConfig streamConf = UnorderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)NullWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)tezConf).build();
        Edge e1 = Edge.create((Vertex)streamFileVertex, (Vertex)joinVertex, (EdgeProperty)streamConf.createDefaultEdgeProperty());
        EdgeProperty hashSideEdgeProperty = null;
        if (doBroadcast) {
            UnorderedKVEdgeConfig broadcastConf = UnorderedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)NullWritable.class.getName()).setFromConfiguration((Configuration)tezConf).build();
            hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
        } else {
            hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
        }
        Edge e2 = Edge.create((Vertex)hashFileVertex, (Vertex)joinVertex, (EdgeProperty)hashSideEdgeProperty);
        dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(joinVertex).addEdge(e1).addEdge(e2);
        return dag;
    }

    public static class HashJoinProcessor
    extends SimpleMRProcessor {
        public HashJoinProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 2 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 1 ? 1 : 0) != 0);
            LogicalInput streamInput = (LogicalInput)this.getInputs().get(HashJoinExample.streamingSide);
            LogicalInput hashInput = (LogicalInput)this.getInputs().get(HashJoinExample.hashSide);
            Reader rawStreamReader = streamInput.getReader();
            Reader rawHashReader = hashInput.getReader();
            Preconditions.checkState((boolean)(rawStreamReader instanceof KeyValueReader));
            Preconditions.checkState((boolean)(rawHashReader instanceof KeyValueReader));
            LogicalOutput lo = (LogicalOutput)this.getOutputs().get(HashJoinExample.joinOutput);
            Preconditions.checkState((boolean)(lo.getWriter() instanceof KeyValueWriter));
            KeyValueWriter writer = (KeyValueWriter)lo.getWriter();
            KeyValueReader hashKvReader = (KeyValueReader)rawHashReader;
            HashSet<Text> keySet = new HashSet<Text>();
            while (hashKvReader.next()) {
                keySet.add(new Text((Text)hashKvReader.getCurrentKey()));
            }
            KeyValueReader streamKvReader = (KeyValueReader)rawStreamReader;
            while (streamKvReader.next()) {
                Text key = (Text)streamKvReader.getCurrentKey();
                if (!keySet.contains(key)) continue;
                writer.write((Object)key, (Object)NullWritable.get());
            }
        }
    }

    public static class ForwardingProcessor
    extends SimpleProcessor {
        public ForwardingProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 1 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 1 ? 1 : 0) != 0);
            LogicalInput input = (LogicalInput)this.getInputs().values().iterator().next();
            Reader rawReader = input.getReader();
            Preconditions.checkState((boolean)(rawReader instanceof KeyValueReader));
            LogicalOutput output = (LogicalOutput)this.getOutputs().values().iterator().next();
            KeyValueReader reader = (KeyValueReader)rawReader;
            KeyValueWriter writer = (KeyValueWriter)output.getWriter();
            while (reader.next()) {
                Object val = reader.getCurrentValue();
                writer.write(val, (Object)NullWritable.get());
            }
        }
    }
}

