package com.datastax.spark.connector.embedded;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaConsumer.scala */
@ScalaSignature(bytes = "\u0006\u0001e4A!\u0001\u0002\u0001\u001b\ti1*\u00194lC\u000e{gn];nKJT!a\u0001\u0003\u0002\u0011\u0015l'-\u001a3eK\u0012T!!\u0002\u0004\u0002\u0013\r|gN\\3di>\u0014(BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0005eCR\f7\u000f^1y\u0015\u0005Y\u0011aA2p[\u000e\u00011C\u0001\u0001\u000f!\ty!#D\u0001\u0011\u0015\u0005\t\u0012!B:dC2\f\u0017BA\n\u0011\u0005\u0019\te.\u001f*fM\"AQ\u0003\u0001B\u0001B\u0003%a#A\u0005{_>\\W-\u001a9feB\u0011qC\u0007\b\u0003\u001faI!!\u0007\t\u0002\rA\u0013X\rZ3g\u0013\tYBD\u0001\u0004TiJLgn\u001a\u0006\u00033AA\u0001B\b\u0001\u0003\u0002\u0003\u0006IAF\u0001\u0006i>\u0004\u0018n\u0019\u0005\tA\u0001\u0011\t\u0011)A\u0005-\u00059qM]8va&#\u0007\u0002\u0003\u0012\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\u0002\u0015A\f'\u000f^5uS>t7\u000f\u0005\u0002\u0010I%\u0011Q\u0005\u0005\u0002\u0004\u0013:$\b\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\u0002\u00159,X\u000e\u00165sK\u0006$7\u000f\u0003\u0005*\u0001\t\u0005\t\u0015!\u0003+\u0003\u0015\u0019w.\u001e8u!\tYC'D\u0001-\u0015\tic&\u0001\u0004bi>l\u0017n\u0019\u0006\u0003_A\n!bY8oGV\u0014(/\u001a8u\u0015\t\t$'\u0001\u0003vi&d'\"A\u001a\u0002\t)\fg/Y\u0005\u0003k1\u0012Q\"\u0011;p[&\u001c\u0017J\u001c;fO\u0016\u0014\b\"B\u001c\u0001\t\u0003A\u0014A\u0002\u001fj]&$h\bF\u0004:wqjdh\u0010!\u0011\u0005i\u0002Q\"\u0001\u0002\t\u000bU1\u0004\u0019\u0001\f\t\u000by1\u0004\u0019\u0001\f\t\u000b\u00012\u0004\u0019\u0001\f\t\u000b\t2\u0004\u0019A\u0012\t\u000b\u001d2\u0004\u0019A\u0012\t\u000b%2\u0004\u0019\u0001\u0016\t\u000f\u0015\u0001!\u0019!C\u0001\u0005V\t1\t\u0005\u0002E\u00136\tQI\u0003\u0002G\u000f\u0006A1m\u001c8tk6,'OC\u0001I\u0003\u0015Y\u0017MZ6b\u0013\tQUIA\tD_:\u001cX/\\3s\u0007>tg.Z2u_JDa\u0001\u0014\u0001!\u0002\u0013\u0019\u0015AC2p]:,7\r^8sA!9a\n\u0001b\u0001\n\u0003y\u0015aB:ue\u0016\fWn]\u000b\u0002!B\u0019q\"U*\n\u0005I\u0003\"AB(qi&|g\u000eE\u0002U9~s!!\u0016.\u000f\u0005YKV\"A,\u000b\u0005ac\u0011A\u0002\u001fs_>$h(C\u0001\u0012\u0013\tY\u0006#A\u0004qC\u000e\\\u0017mZ3\n\u0005us&\u0001\u0002'jgRT!a\u0017\t\u0011\t\u0011\u0003gCF\u0005\u0003C\u0016\u00131bS1gW\u0006\u001cFO]3b[\"11\r\u0001Q\u0001\nA\u000b\u0001b\u001d;sK\u0006l7\u000f\t\u0005\bK\u0002\u0011\r\u0011\"\u0001g\u0003!)\u00070Z2vi>\u0014X#A4\u0011\u0005!LW\"\u0001\u0018\n\u0005)t#aD#yK\u000e,Ho\u001c:TKJ4\u0018nY3\t\r1\u0004\u0001\u0015!\u0003h\u0003%)\u00070Z2vi>\u0014\b\u0005C\u0003o\u0001\u0011%q.\u0001\u000bde\u0016\fG/Z\"p]N,X.\u001a:D_:4\u0017nZ\u000b\u0002aB\u0011A)]\u0005\u0003e\u0016\u0013abQ8ogVlWM]\"p]\u001aLw\rC\u0003u\u0001\u0011\u0005Q/\u0001\u0005tQV$Hm\\<o)\u00051\bCA\bx\u0013\tA\bC\u0001\u0003V]&$\b")
/* loaded from: input_file:com/datastax/spark/connector/embedded/KafkaConsumer.class */
public class KafkaConsumer {
    private final String zookeeper;
    private final String groupId;
    public final AtomicInteger com$datastax$spark$connector$embedded$KafkaConsumer$$count;
    private final ConsumerConnector connector = Consumer$.MODULE$.create(createConsumerConfig());
    private final Option<List<KafkaStream<String, String>>> streams;
    private final ExecutorService executor;

    public ConsumerConnector connector() {
        return this.connector;
    }

    public Option<List<KafkaStream<String, String>>> streams() {
        return this.streams;
    }

    public ExecutorService executor() {
        return this.executor;
    }

    private ConsumerConfig createConsumerConfig() {
        Properties properties = new Properties();
        properties.put("consumer.timeout.ms", "2000");
        properties.put("zookeeper.connect", this.zookeeper);
        properties.put("group.id", this.groupId);
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "10");
        properties.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(properties);
    }

    public void shutdown() {
        Predef$.MODULE$.println("Consumer shutting down.");
        Option$.MODULE$.apply(connector()).map(new KafkaConsumer$$anonfun$shutdown$1(this));
        Option$.MODULE$.apply(executor()).map(new KafkaConsumer$$anonfun$shutdown$2(this));
    }

    public KafkaConsumer(String str, String str2, String str3, int i, int i2, AtomicInteger atomicInteger) {
        this.zookeeper = str;
        this.groupId = str3;
        this.com$datastax$spark$connector$embedded$KafkaConsumer$$count = atomicInteger;
        this.streams = connector().createMessageStreams(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str2), BoxesRunTime.boxToInteger(i))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1())).get(str2);
        this.executor = Executors.newFixedThreadPool(i2);
        streams().foreach(new KafkaConsumer$$anonfun$2(this));
    }
}
