/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.examples;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.TezExampleBase;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;

public class JoinValidate
extends TezExampleBase {
    private static final Log LOG = LogFactory.getLog(JoinValidate.class);
    private static final String LHS_INPUT_NAME = "lhsfile";
    private static final String RHS_INPUT_NAME = "rhsfile";
    private static final String COUNTER_GROUP_NAME = "JOIN_VALIDATE";
    private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";

    public static void main(String[] args) throws Exception {
        JoinValidate validate = new JoinValidate();
        int status = ToolRunner.run((Configuration)new Configuration(), (Tool)validate, (String[])args);
        System.exit(status);
    }

    @Override
    protected void printUsage() {
        System.err.println("Usage: joinvalidate <path1> <path2>");
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    @Override
    protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws Exception {
        LOG.info((Object)"Running JoinValidate");
        String lhsDir = args[0];
        String rhsDir = args[1];
        int numPartitions = 1;
        if (args.length == 3) {
            numPartitions = Integer.parseInt(args[2]);
        }
        if (numPartitions <= 0) {
            System.err.println("NumPartitions must be > 0");
            return 4;
        }
        Path lhsPath = new Path(lhsDir);
        Path rhsPath = new Path(rhsDir);
        DAG dag = this.createDag(tezConf, lhsPath, rhsPath, numPartitions);
        tezClient.waitTillReady();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
            LOG.info((Object)("DAG diagnostics: " + dagStatus.getDiagnostics()));
            return -1;
        }
        dagStatus = dagClient.getDAGStatus((Set)Sets.newHashSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME, MISSING_KEY_COUNTER_NAME);
        if (counter == null) {
            LOG.info((Object)"Unable to determing equality");
            return -2;
        }
        if (counter.getValue() != 0L) {
            LOG.info((Object)"Validate failed. The two sides are not equivalent");
            return -3;
        }
        LOG.info((Object)"Validation successful. The two sides are equivalent");
        return 0;
    }

    @Override
    protected int validateArgs(String[] otherArgs) {
        if (otherArgs.length != 3 && otherArgs.length != 2) {
            return 2;
        }
        return 0;
    }

    private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) throws IOException {
        DAG dag = DAG.create((String)"JoinValidate");
        OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)NullWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)tezConf).build();
        Vertex lhsVertex = Vertex.create((String)LHS_INPUT_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)HashJoinExample.ForwardingProcessor.class.getName())).addDataSource("lhs", MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)lhs.toUri().toString()).groupSplits(false).build());
        Vertex rhsVertex = Vertex.create((String)RHS_INPUT_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)HashJoinExample.ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)rhs.toUri().toString()).groupSplits(false).build());
        Vertex joinValidateVertex = Vertex.create((String)"joinvalidate", (ProcessorDescriptor)ProcessorDescriptor.create((String)JoinValidateProcessor.class.getName()), (int)numPartitions);
        Edge e1 = Edge.create((Vertex)lhsVertex, (Vertex)joinValidateVertex, (EdgeProperty)edgeConf.createDefaultEdgeProperty());
        Edge e2 = Edge.create((Vertex)rhsVertex, (Vertex)joinValidateVertex, (EdgeProperty)edgeConf.createDefaultEdgeProperty());
        dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(joinValidateVertex).addEdge(e1).addEdge(e2);
        return dag;
    }

    public static class JoinValidateProcessor
    extends SimpleProcessor {
        private static final Log LOG = LogFactory.getLog(JoinValidateProcessor.class);

        public JoinValidateProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 2 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 0 ? 1 : 0) != 0);
            LogicalInput lhsInput = (LogicalInput)this.getInputs().get(JoinValidate.LHS_INPUT_NAME);
            LogicalInput rhsInput = (LogicalInput)this.getInputs().get(JoinValidate.RHS_INPUT_NAME);
            Reader lhsReaderRaw = lhsInput.getReader();
            Reader rhsReaderRaw = rhsInput.getReader();
            Preconditions.checkState((boolean)(lhsReaderRaw instanceof KeyValuesReader));
            Preconditions.checkState((boolean)(rhsReaderRaw instanceof KeyValuesReader));
            KeyValuesReader lhsReader = (KeyValuesReader)lhsReaderRaw;
            KeyValuesReader rhsReader = (KeyValuesReader)rhsReaderRaw;
            TezCounter lhsMissingKeyCounter = this.getContext().getCounters().findCounter(JoinValidate.COUNTER_GROUP_NAME, JoinValidate.MISSING_KEY_COUNTER_NAME);
            while (lhsReader.next()) {
                if (rhsReader.next()) {
                    if (lhsReader.getCurrentKey().equals(rhsReader.getCurrentKey())) continue;
                    LOG.info((Object)("MismatchedKeys: lhs=" + lhsReader.getCurrentKey() + ", rhs=" + rhsReader.getCurrentKey()));
                    lhsMissingKeyCounter.increment(1L);
                    continue;
                }
                lhsMissingKeyCounter.increment(1L);
                LOG.info((Object)("ExtraKey in lhs: " + lhsReader.getClass()));
                break;
            }
            if (rhsReader.next()) {
                lhsMissingKeyCounter.increment(1L);
                LOG.info((Object)("ExtraKey in rhs: " + lhsReader.getClass()));
            }
        }
    }
}

