package org.apache.spark.streaming.kafka;

import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.apache.spark.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.immutable.List;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaInputDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dd!B\u0001\u0003\u0001\u0011a!!D&bM.\f'+Z2fSZ,'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u000b\u0006\u001bq9#\f\\\n\u0004\u00019I\u0003cA\b\u0013)5\t\u0001C\u0003\u0002\u0012\t\u0005A!/Z2fSZ,'/\u0003\u0002\u0014!\tA!+Z2fSZ,'\u000f\u0005\u0003\u00161i1S\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\rQ+\b\u000f\\33!\tYB\u0004\u0004\u0001\u0005\u000bu\u0001!\u0019A\u0010\u0003\u0003-\u001b\u0001!\u0005\u0002!GA\u0011Q#I\u0005\u0003EY\u0011qAT8uQ&tw\r\u0005\u0002\u0016I%\u0011QE\u0006\u0002\u0004\u0003:L\bCA\u000e(\t\u0015A\u0003A1\u0001 \u0005\u00051\u0006C\u0001\u0016,\u001b\u00051\u0011B\u0001\u0017\u0007\u0005\u001daunZ4j]\u001eD\u0001B\f\u0001\u0003\u0002\u0003\u0006IaL\u0001\fW\u000647.\u0019)be\u0006l7\u000f\u0005\u00031gU*T\"A\u0019\u000b\u0005I2\u0012AC2pY2,7\r^5p]&\u0011A'\r\u0002\u0004\u001b\u0006\u0004\bC\u0001\u001c:\u001d\t)r'\u0003\u00029-\u00051\u0001K]3eK\u001aL!AO\u001e\u0003\rM#(/\u001b8h\u0015\tAd\u0003\u0003\u0005>\u0001\t\u0005\t\u0015!\u0003?\u0003\u0019!x\u000e]5dgB!\u0001gM\u001b@!\t)\u0002)\u0003\u0002B-\t\u0019\u0011J\u001c;\t\u0013\r\u0003!\u0011!Q\u0001\n\u0011S\u0015\u0001D:u_J\fw-\u001a'fm\u0016d\u0007CA#I\u001b\u00051%BA$\u0007\u0003\u001d\u0019Ho\u001c:bO\u0016L!!\u0013$\u0003\u0019M#xN]1hK2+g/\u001a7\n\u0005\r\u0013\u0002\u0002\u0003'\u0001\u0005\u0007\u0005\u000b1B'\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$S\u0007E\u0002O#ji\u0011a\u0014\u0006\u0003!Z\tqA]3gY\u0016\u001cG/\u0003\u0002S\u001f\nA1\t\\1tgR\u000bw\r\u0003\u0005U\u0001\t\r\t\u0015a\u0003V\u0003))g/\u001b3f]\u000e,GE\u000e\t\u0004\u001dF3\u0003\u0002C,\u0001\u0005\u0007\u0005\u000b1\u0002-\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$s\u0007E\u0002O#f\u0003\"a\u0007.\u0005\u000bm\u0003!\u0019\u0001/\u0003\u0003U\u000b\"\u0001I/1\u0005y3\u0007cA0dK6\t\u0001M\u0003\u0002bE\u0006Q1/\u001a:jC2L'0\u001a:\u000b\u0003\rI!\u0001\u001a1\u0003\u000f\u0011+7m\u001c3feB\u00111D\u001a\u0003\nOj\u000b\t\u0011!A\u0003\u0002}\u00111a\u0018\u00134\u0011!I\u0007AaA!\u0002\u0017Q\u0017AC3wS\u0012,gnY3%qA\u0019a*U6\u0011\u0005maG!B7\u0001\u0005\u0004q'!\u0001+\u0012\u0005\u0001z\u0007G\u00019s!\ry6-\u001d\t\u00037I$\u0011b\u001d7\u0002\u0002\u0003\u0005)\u0011A\u0010\u0003\u0007}#C\u0007C\u0003v\u0001\u0011\u0005a/\u0001\u0004=S:LGO\u0010\u000b\u0006oz|\u0018\u0011\u0001\u000b\u0006qj\\H0 \t\u0007s\u0002Qb%W6\u000e\u0003\tAQ\u0001\u0014;A\u00045CQ\u0001\u0016;A\u0004UCQa\u0016;A\u0004aCQ!\u001b;A\u0004)DQA\f;A\u0002=BQ!\u0010;A\u0002yBQa\u0011;A\u0002\u0011C\u0011\"!\u0002\u0001\u0001\u0004%\t!a\u0002\u0002#\r|gn];nKJ\u001cuN\u001c8fGR|'/\u0006\u0002\u0002\nA!\u00111BA\t\u001b\t\tiAC\u0002\u0002\u0010\t\f\u0001bY8ogVlWM]\u0005\u0005\u0003'\tiAA\tD_:\u001cX/\\3s\u0007>tg.Z2u_JD\u0011\"a\u0006\u0001\u0001\u0004%\t!!\u0007\u0002+\r|gn];nKJ\u001cuN\u001c8fGR|'o\u0018\u0013fcR!\u00111DA\u0011!\r)\u0012QD\u0005\u0004\u0003?1\"\u0001B+oSRD!\"a\t\u0002\u0016\u0005\u0005\t\u0019AA\u0005\u0003\rAH%\r\u0005\t\u0003O\u0001\u0001\u0015)\u0003\u0002\n\u0005\u00112m\u001c8tk6,'oQ8o]\u0016\u001cGo\u001c:!\u0011\u001d\tY\u0003\u0001C\u0001\u0003[\taa\u001c8Ti>\u0004HCAA\u000e\u0011\u001d\t\t\u0004\u0001C\u0001\u0003[\tqa\u001c8Ti\u0006\u0014HO\u0002\u0004\u00026\u0001!\u0011q\u0007\u0002\u000f\u001b\u0016\u001c8/Y4f\u0011\u0006tG\r\\3s'\u0019\t\u0019$!\u000f\u0002JA!\u00111HA#\u001b\t\tiD\u0003\u0003\u0002@\u0005\u0005\u0013\u0001\u00027b]\u001eT!!a\u0011\u0002\t)\fg/Y\u0005\u0005\u0003\u000f\niD\u0001\u0004PE*,7\r\u001e\t\u0005\u0003w\tY%\u0003\u0003\u0002N\u0005u\"\u0001\u0003*v]:\f'\r\\3\t\u0017\u0005E\u00131\u0007B\u0001B\u0003%\u00111K\u0001\u0007gR\u0014X-Y7\u0011\r\u0005-\u0011Q\u000b\u000e'\u0013\u0011\t9&!\u0004\u0003\u0017-\u000bgm[1TiJ,\u0017-\u001c\u0005\bk\u0006MB\u0011AA.)\u0011\ti&!\u0019\u0011\t\u0005}\u00131G\u0007\u0002\u0001!A\u0011\u0011KA-\u0001\u0004\t\u0019\u0006\u0003\u0005\u0002f\u0005MB\u0011AA\u0017\u0003\r\u0011XO\u001c")
/* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver.class */
public class KafkaReceiver<K, V, U extends Decoder<?>, T extends Decoder<?>> extends Receiver<Tuple2<K, V>> implements Logging {
    public final Map<String, String> org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams;
    private final Map<String, Object> topics;
    private final ClassTag<U> evidence$7;
    private final ClassTag<T> evidence$8;
    private ConsumerConnector consumerConnector;
    private transient Logger org$apache$spark$Logging$$log_;

    /* compiled from: KafkaInputDStream.scala */
    /* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler.class */
    public class MessageHandler implements Runnable {
        private final KafkaStream<K, V> stream;
        public final /* synthetic */ KafkaReceiver $outer;

        @Override // java.lang.Runnable
        public void run() {
            org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().logInfo(new KafkaReceiver$MessageHandler$$anonfun$run$1(this));
            try {
                ConsumerIterator<K, V> it = this.stream.iterator();
                while (it.hasNext()) {
                    MessageAndMetadata<K, V> next = it.next();
                    org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().store(new Tuple2(next.key(), next.message()));
                }
            } catch (Throwable th) {
                org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().reportError("Error handling message; exiting", th);
            }
        }

        public /* synthetic */ KafkaReceiver org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(KafkaReceiver<K, V, U, T> kafkaReceiver, KafkaStream<K, V> kafkaStream) {
            this.stream = kafkaStream;
            if (kafkaReceiver == null) {
                throw null;
            }
            this.$outer = kafkaReceiver;
        }
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public ConsumerConnector consumerConnector() {
        return this.consumerConnector;
    }

    public void consumerConnector_$eq(ConsumerConnector consumerConnector) {
        this.consumerConnector = consumerConnector;
    }

    public void onStop() {
        if (consumerConnector() != null) {
            consumerConnector().shutdown();
            consumerConnector_$eq(null);
        }
    }

    public void onStart() {
        logInfo(new KafkaReceiver$$anonfun$onStart$1(this));
        Properties properties = new Properties();
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.foreach(new KafkaReceiver$$anonfun$onStart$2(this, properties));
        String str = (String) this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.apply("zookeeper.connect");
        logInfo(new KafkaReceiver$$anonfun$onStart$3(this, str));
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector_$eq(Consumer$.MODULE$.create(consumerConfig));
        logInfo(new KafkaReceiver$$anonfun$onStart$4(this, str));
        Map<String, List<KafkaStream<K, V>>> createMessageStreams = consumerConnector().createMessageStreams(this.topics, (Decoder) scala.reflect.package$.MODULE$.classTag(this.evidence$7).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props()), (Decoder) scala.reflect.package$.MODULE$.classTag(this.evidence$8).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props()));
        ThreadPoolExecutor newDaemonFixedThreadPool = ThreadUtils$.MODULE$.newDaemonFixedThreadPool(BoxesRunTime.unboxToInt(this.topics.values().sum(Numeric$IntIsIntegral$.MODULE$)), "KafkaMessageHandler");
        try {
            createMessageStreams.values().foreach(new KafkaReceiver$$anonfun$onStart$5(this, newDaemonFixedThreadPool));
        } finally {
            newDaemonFixedThreadPool.shutdown();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaReceiver(Map<String, String> map, Map<String, Object> map2, StorageLevel storageLevel, ClassTag<K> classTag, ClassTag<V> classTag2, ClassTag<U> classTag3, ClassTag<T> classTag4) {
        super(storageLevel);
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams = map;
        this.topics = map2;
        this.evidence$7 = classTag3;
        this.evidence$8 = classTag4;
        Logging.class.$init$(this);
        this.consumerConnector = null;
    }
}
