/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.concurrent.atomic.AtomicLong;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaMessageStream;
import kafka.tools.ConsumerPerformance$;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

public final class ConsumerPerformance$
implements ScalaObject {
    public static final ConsumerPerformance$ MODULE$;
    private final Logger kafka$tools$ConsumerPerformance$$logger;

    static {
        new ConsumerPerformance$();
    }

    public final Logger kafka$tools$ConsumerPerformance$$logger() {
        return this.kafka$tools$ConsumerPerformance$$logger;
    }

    public void main(String[] args) {
        OptionParser parser$1 = new OptionParser();
        ArgumentAcceptingOptionSpec topicOpt = parser$1.accepts("topic", "REQUIRED: The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec consumerPropsOpt = parser$1.accepts("props", "REQUIRED: Properties file with the consumer properties.").withRequiredArg().describedAs("properties").ofType(String.class);
        ArgumentAcceptingOptionSpec numThreadsOpt = parser$1.accepts("threads", "Number of processing threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(10), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec reportingIntervalOpt = parser$1.accepts("reporting-interval", "Interval at which to print progress info.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(100000), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec sleepSecsOpt = parser$1.accepts("sleep", "Initial interval to wait before connecting.").withRequiredArg().describedAs("secs").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(5), (Object[])new Integer[0]);
        OptionSet options$1 = parser$1.parse(args);
        List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new ArgumentAcceptingOptionSpec[]{topicOpt, consumerPropsOpt})).foreach((Function1)new anonfun.main.1(parser$1, options$1));
        int numThreads = (Integer)options$1.valueOf((OptionSpec)numThreadsOpt);
        int reportingInterval = (Integer)options$1.valueOf((OptionSpec)reportingIntervalOpt);
        String propsFile = (String)options$1.valueOf((OptionSpec)consumerPropsOpt);
        String topic = (String)options$1.valueOf((OptionSpec)topicOpt);
        int printInterval$1 = (Integer)options$1.valueOf((OptionSpec)reportingIntervalOpt);
        int initialSleep = (Integer)options$1.valueOf((OptionSpec)sleepSecsOpt) * 1000;
        Predef$.MODULE$.println((Object)"Starting consumer...");
        ObjectRef totalNumMsgs$1 = new ObjectRef((Object)new AtomicLong(0L));
        ObjectRef totalNumBytes$1 = new ObjectRef((Object)new AtomicLong(0L));
        ConsumerConfig consumerConfig = new ConsumerConfig(Utils$.MODULE$.loadProps(propsFile));
        ConsumerConnector consumerConnector$1 = Consumer$.MODULE$.create(consumerConfig);
        Map<String, List<KafkaMessageStream>> topicMessageStreams = consumerConnector$1.createMessageStreams((Map<String, Integer>)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc((Object)topic).$minus$greater((Object)BoxesRunTime.boxToInteger((int)numThreads))})));
        ObjectRef threadList$1 = new ObjectRef((Object)Nil$.MODULE$);
        topicMessageStreams.foreach((Function1)new anonfun.main.2(printInterval$1, totalNumMsgs$1, totalNumBytes$1, threadList$1));
        this.kafka$tools$ConsumerPerformance$$logger().info((Object)new StringBuilder().append((Object)"Sleeping for ").append((Object)BoxesRunTime.boxToInteger((int)(initialSleep / 1000))).append((Object)" seconds.").toString());
        Thread.sleep(initialSleep);
        this.kafka$tools$ConsumerPerformance$$logger().info((Object)"starting threads");
        ((List)threadList$1.elem).foreach((Function1)new anonfun.main.3());
        Runtime.getRuntime().addShutdownHook(new anon.2(totalNumMsgs$1, totalNumBytes$1, consumerConnector$1, threadList$1));
    }

    private ConsumerPerformance$() {
        MODULE$ = this;
        this.kafka$tools$ConsumerPerformance$$logger = Logger.getLogger(this.getClass());
    }
}

