package org.apache.spark.streaming.kafka;

import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.apache.spark.internal.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.immutable.List;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaInputDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-d!B\u0001\u0003\u0001\u0011a!!D&bM.\f'+Z2fSZ,'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u000b\u0006\u001bq9CL\\\n\u0004\u00019I\u0003cA\b\u0013)5\t\u0001C\u0003\u0002\u0012\t\u0005A!/Z2fSZ,'/\u0003\u0002\u0014!\tA!+Z2fSZ,'\u000f\u0005\u0003\u00161i1S\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\rQ+\b\u000f\\33!\tYB\u0004\u0004\u0001\u0005\u000bu\u0001!\u0019A\u0010\u0003\u0003-\u001b\u0001!\u0005\u0002!GA\u0011Q#I\u0005\u0003EY\u0011qAT8uQ&tw\r\u0005\u0002\u0016I%\u0011QE\u0006\u0002\u0004\u0003:L\bCA\u000e(\t\u0015A\u0003A1\u0001 \u0005\u00051\u0006C\u0001\u0016.\u001b\u0005Y#B\u0001\u0017\u0007\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001\u0018,\u0005\u001daunZ4j]\u001eD\u0001\u0002\r\u0001\u0003\u0002\u0003\u0006I!M\u0001\fW\u000647.\u0019)be\u0006l7\u000f\u0005\u00033k]:T\"A\u001a\u000b\u0005Q2\u0012AC2pY2,7\r^5p]&\u0011ag\r\u0002\u0004\u001b\u0006\u0004\bC\u0001\u001d<\u001d\t)\u0012(\u0003\u0002;-\u00051\u0001K]3eK\u001aL!\u0001P\u001f\u0003\rM#(/\u001b8h\u0015\tQd\u0003\u0003\u0005@\u0001\t\u0005\t\u0015!\u0003A\u0003\u0019!x\u000e]5dgB!!'N\u001cB!\t)\")\u0003\u0002D-\t\u0019\u0011J\u001c;\t\u0013\u0015\u0003!\u0011!Q\u0001\n\u0019c\u0015\u0001D:u_J\fw-\u001a'fm\u0016d\u0007CA$K\u001b\u0005A%BA%\u0007\u0003\u001d\u0019Ho\u001c:bO\u0016L!a\u0013%\u0003\u0019M#xN]1hK2+g/\u001a7\n\u0005\u0015\u0013\u0002\u0002\u0003(\u0001\u0005\u0007\u0005\u000b1B(\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$S\u0007E\u0002Q'ji\u0011!\u0015\u0006\u0003%Z\tqA]3gY\u0016\u001cG/\u0003\u0002U#\nA1\t\\1tgR\u000bw\r\u0003\u0005W\u0001\t\r\t\u0015a\u0003X\u0003))g/\u001b3f]\u000e,GE\u000e\t\u0004!N3\u0003\u0002C-\u0001\u0005\u0007\u0005\u000b1\u0002.\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$s\u0007E\u0002Q'n\u0003\"a\u0007/\u0005\u000bu\u0003!\u0019\u00010\u0003\u0003U\u000b\"\u0001I01\u0005\u0001D\u0007cA1fO6\t!M\u0003\u0002dI\u0006Q1/\u001a:jC2L'0\u001a:\u000b\u0003\rI!A\u001a2\u0003\u000f\u0011+7m\u001c3feB\u00111\u0004\u001b\u0003\nSr\u000b\t\u0011!A\u0003\u0002}\u00111a\u0018\u00134\u0011!Y\u0007AaA!\u0002\u0017a\u0017AC3wS\u0012,gnY3%qA\u0019\u0001kU7\u0011\u0005mqG!B8\u0001\u0005\u0004\u0001(!\u0001+\u0012\u0005\u0001\n\bG\u0001:u!\r\tWm\u001d\t\u00037Q$\u0011\"\u001e8\u0002\u0002\u0003\u0005)\u0011A\u0010\u0003\u0007}#C\u0007C\u0003x\u0001\u0011\u0005\u00010\u0001\u0004=S:LGO\u0010\u000b\bs\u0006\u0005\u00111AA\u0003)\u0015QH0 @��!\u0019Y\bA\u0007\u0014\\[6\t!\u0001C\u0003Om\u0002\u000fq\nC\u0003Wm\u0002\u000fq\u000bC\u0003Zm\u0002\u000f!\fC\u0003lm\u0002\u000fA\u000eC\u00031m\u0002\u0007\u0011\u0007C\u0003@m\u0002\u0007\u0001\tC\u0003Fm\u0002\u0007a\tC\u0005\u0002\n\u0001\u0001\r\u0011\"\u0001\u0002\f\u0005\t2m\u001c8tk6,'oQ8o]\u0016\u001cGo\u001c:\u0016\u0005\u00055\u0001\u0003BA\b\u0003+i!!!\u0005\u000b\u0007\u0005MA-\u0001\u0005d_:\u001cX/\\3s\u0013\u0011\t9\"!\u0005\u0003#\r{gn];nKJ\u001cuN\u001c8fGR|'\u000fC\u0005\u0002\u001c\u0001\u0001\r\u0011\"\u0001\u0002\u001e\u0005)2m\u001c8tk6,'oQ8o]\u0016\u001cGo\u001c:`I\u0015\fH\u0003BA\u0010\u0003K\u00012!FA\u0011\u0013\r\t\u0019C\u0006\u0002\u0005+:LG\u000f\u0003\u0006\u0002(\u0005e\u0011\u0011!a\u0001\u0003\u001b\t1\u0001\u001f\u00132\u0011!\tY\u0003\u0001Q!\n\u00055\u0011AE2p]N,X.\u001a:D_:tWm\u0019;pe\u0002Bq!a\f\u0001\t\u0003\t\t$\u0001\u0004p]N#x\u000e\u001d\u000b\u0003\u0003?Aq!!\u000e\u0001\t\u0003\t\t$A\u0004p]N#\u0018M\u001d;\u0007\r\u0005e\u0002\u0001BA\u001e\u00059iUm]:bO\u0016D\u0015M\u001c3mKJ\u001cb!a\u000e\u0002>\u00055\u0003\u0003BA \u0003\u0013j!!!\u0011\u000b\t\u0005\r\u0013QI\u0001\u0005Y\u0006twM\u0003\u0002\u0002H\u0005!!.\u0019<b\u0013\u0011\tY%!\u0011\u0003\r=\u0013'.Z2u!\u0011\ty$a\u0014\n\t\u0005E\u0013\u0011\t\u0002\t%Vtg.\u00192mK\"Y\u0011QKA\u001c\u0005\u0003\u0005\u000b\u0011BA,\u0003\u0019\u0019HO]3b[B1\u0011qBA-5\u0019JA!a\u0017\u0002\u0012\tY1*\u00194lCN#(/Z1n\u0011\u001d9\u0018q\u0007C\u0001\u0003?\"B!!\u0019\u0002fA!\u00111MA\u001c\u001b\u0005\u0001\u0001\u0002CA+\u0003;\u0002\r!a\u0016\t\u0011\u0005%\u0014q\u0007C\u0001\u0003c\t1A];o\u0001")
/* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver.class */
public class KafkaReceiver<K, V, U extends Decoder<?>, T extends Decoder<?>> extends Receiver<Tuple2<K, V>> implements Logging {
    public final Map<String, String> org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams;
    private final Map<String, Object> topics;
    private final ClassTag<U> evidence$7;
    private final ClassTag<T> evidence$8;
    private ConsumerConnector consumerConnector;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: KafkaInputDStream.scala */
    /* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler.class */
    public class MessageHandler implements Runnable {
        private final KafkaStream<K, V> stream;
        public final /* synthetic */ KafkaReceiver $outer;

        @Override // java.lang.Runnable
        public void run() {
            org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().logInfo(new KafkaReceiver$MessageHandler$$anonfun$run$1(this));
            try {
                ConsumerIterator<K, V> it = this.stream.iterator();
                while (it.hasNext()) {
                    MessageAndMetadata<K, V> mo2443next = it.mo2443next();
                    org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().store(new Tuple2(mo2443next.key(), mo2443next.message()));
                }
            } catch (Throwable th) {
                org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().reportError("Error handling message; exiting", th);
            }
        }

        public /* synthetic */ KafkaReceiver org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(KafkaReceiver<K, V, U, T> kafkaReceiver, KafkaStream<K, V> kafkaStream) {
            this.stream = kafkaStream;
            if (kafkaReceiver == null) {
                throw null;
            }
            this.$outer = kafkaReceiver;
        }
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public ConsumerConnector consumerConnector() {
        return this.consumerConnector;
    }

    public void consumerConnector_$eq(ConsumerConnector consumerConnector) {
        this.consumerConnector = consumerConnector;
    }

    public void onStop() {
        if (consumerConnector() != null) {
            consumerConnector().shutdown();
            consumerConnector_$eq(null);
        }
    }

    public void onStart() {
        logInfo(new KafkaReceiver$$anonfun$onStart$1(this));
        Properties properties = new Properties();
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.foreach(new KafkaReceiver$$anonfun$onStart$2(this, properties));
        String apply = this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.mo245apply("zookeeper.connect");
        logInfo(new KafkaReceiver$$anonfun$onStart$3(this, apply));
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector_$eq(Consumer$.MODULE$.create(consumerConfig));
        logInfo(new KafkaReceiver$$anonfun$onStart$4(this, apply));
        Map<String, List<KafkaStream<K, V>>> createMessageStreams = consumerConnector().createMessageStreams(this.topics, (Decoder) scala.reflect.package$.MODULE$.classTag(this.evidence$7).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props()), (Decoder) scala.reflect.package$.MODULE$.classTag(this.evidence$8).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props()));
        ThreadPoolExecutor newDaemonFixedThreadPool = ThreadUtils$.MODULE$.newDaemonFixedThreadPool(BoxesRunTime.unboxToInt(this.topics.values().mo2559sum(Numeric$IntIsIntegral$.MODULE$)), "KafkaMessageHandler");
        try {
            createMessageStreams.values().foreach(new KafkaReceiver$$anonfun$onStart$5(this, newDaemonFixedThreadPool));
        } finally {
            newDaemonFixedThreadPool.shutdown();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaReceiver(Map<String, String> map, Map<String, Object> map2, StorageLevel storageLevel, ClassTag<K> classTag, ClassTag<V> classTag2, ClassTag<U> classTag3, ClassTag<T> classTag4) {
        super(storageLevel);
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams = map;
        this.topics = map2;
        this.evidence$7 = classTag3;
        this.evidence$8 = classTag4;
        Logging.class.$init$(this);
        this.consumerConnector = null;
    }
}
