package com.datastax.spark.connector.embedded;

import com.datastax.spark.connector.embedded.Assertions;
import com.datastax.spark.connector.embedded.EmbeddedIO;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Properties;
import kafka.admin.CreateTopicCommand$;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;
import scala.util.Random$;

/* compiled from: EmbeddedKafka.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uc\u0001B\u0001\u0003\u00055\u0011Q\"R7cK\u0012$W\rZ&bM.\f'BA\u0002\u0005\u0003!)WNY3eI\u0016$'BA\u0003\u0007\u0003%\u0019wN\u001c8fGR|'O\u0003\u0002\b\u0011\u0005)1\u000f]1sW*\u0011\u0011BC\u0001\tI\u0006$\u0018m\u001d;bq*\t1\"A\u0002d_6\u001c\u0001aE\u0002\u0001\u001dQ\u0001\"a\u0004\n\u000e\u0003AQ\u0011!E\u0001\u0006g\u000e\fG.Y\u0005\u0003'A\u0011a!\u00118z%\u00164\u0007CA\u000b\u0017\u001b\u0005\u0011\u0011BA\f\u0003\u0005!)UNY3eI\u0016$\u0007\"B\r\u0001\t\u0003Q\u0012A\u0002\u001fj]&$h\bF\u0001\u001c!\t)\u0002\u0001C\u0004\u001e\u0001\t\u0007I\u0011\u0001\u0010\u0002\u0017-\fgm[1QCJ\fWn]\u000b\u0002?A!\u0001%J\u0014(\u001b\u0005\t#B\u0001\u0012$\u0003%IW.\\;uC\ndWM\u0003\u0002%!\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0019\n#aA'baB\u0011\u0001&L\u0007\u0002S)\u0011!fK\u0001\u0005Y\u0006twMC\u0001-\u0003\u0011Q\u0017M^1\n\u00059J#AB*ue&tw\r\u0003\u00041\u0001\u0001\u0006IaH\u0001\rW\u000647.\u0019)be\u0006l7\u000f\t\u0005\be\u0001\u0011\r\u0011\"\u00034\u0003)\u0011'o\\6fe\u000e{gNZ\u000b\u0002iA\u0011Q\u0007O\u0007\u0002m)\u0011qgK\u0001\u0005kRLG.\u0003\u0002:m\tQ\u0001K]8qKJ$\u0018.Z:\t\rm\u0002\u0001\u0015!\u00035\u0003-\u0011'o\\6fe\u000e{gN\u001a\u0011\t\u000fu\u0002!\u0019!C\u0005}\u0005I!p\\8lK\u0016\u0004XM]\u000b\u0002\u007fA\u0011Q\u0003Q\u0005\u0003\u0003\n\u0011\u0011#R7cK\u0012$W\r\u001a.p_.,W\r]3s\u0011\u0019\u0019\u0005\u0001)A\u0005\u007f\u0005Q!p\\8lK\u0016\u0004XM\u001d\u0011\t\u000f\u0015\u0003!\u0019!C\u0001\r\u000611\r\\5f]R,\u0012a\u0012\t\u0003\u0011>k\u0011!\u0013\u0006\u0003\u0015.\u000b\u0001B_6dY&,g\u000e\u001e\u0006\u0003\u00196\u000ba!\u0013\u0019Ji\u0016\u001c'\"\u0001(\u0002\u0007=\u0014x-\u0003\u0002Q\u0013\nA!l[\"mS\u0016tG\u000f\u0003\u0004S\u0001\u0001\u0006IaR\u0001\bG2LWM\u001c;!\u0011\u001d!\u0006A1A\u0005\u0002U\u000b1b[1gW\u0006\u001cuN\u001c4jOV\ta\u000b\u0005\u0002X96\t\u0001L\u0003\u0002Z5\u000611/\u001a:wKJT\u0011aW\u0001\u0006W\u000647.Y\u0005\u0003;b\u00131bS1gW\u0006\u001cuN\u001c4jO\"1q\f\u0001Q\u0001\nY\u000bAb[1gW\u0006\u001cuN\u001c4jO\u0002Bq!\u0017\u0001C\u0002\u0013\u0005\u0011-F\u0001c!\t96-\u0003\u0002e1\nY1*\u00194lCN+'O^3s\u0011\u00191\u0007\u0001)A\u0005E\u000691/\u001a:wKJ\u0004\u0003\"\u00025\u0001\t\u0003I\u0017aC2sK\u0006$X\rV8qS\u000e$\"A[7\u0011\u0005=Y\u0017B\u00017\u0011\u0005\u0011)f.\u001b;\t\u000b9<\u0007\u0019A8\u0002\u000bQ|\u0007/[2\u0011\u0005A\u001chBA\br\u0013\t\u0011\b#\u0001\u0004Qe\u0016$WMZ\u0005\u0003]QT!A\u001d\t\t\u000bY\u0004A\u0011A<\u0002+A\u0014x\u000eZ;dK\u0006sGmU3oI6+7o]1hKR\u0019!\u000e_=\t\u000b9,\b\u0019A8\t\u000bi,\b\u0019A>\u0002\tM,g\u000e\u001e\t\u0005ar|W0\u0003\u0002'iB\u0011qB`\u0005\u0003\u007fB\u00111!\u00138u\u0011\u001d\t\u0019\u0001\u0001C\u0005\u0003\u000b\t\u0011c\u0019:fCR,G+Z:u\u001b\u0016\u001c8/Y4f)\u0019\t9!a\u000b\u0002.A1\u0011\u0011BA\r\u0003?qA!a\u0003\u0002\u00169!\u0011QBA\n\u001b\t\tyAC\u0002\u0002\u00121\ta\u0001\u0010:p_Rt\u0014\"A\t\n\u0007\u0005]\u0001#A\u0004qC\u000e\\\u0017mZ3\n\t\u0005m\u0011Q\u0004\u0002\u0004'\u0016\f(bAA\f!A1\u0011\u0011EA\u0014_>l!!a\t\u000b\u0007\u0005\u0015\",\u0001\u0005qe>$WoY3s\u0013\u0011\tI#a\t\u0003\u0019-+\u00170\u001a3NKN\u001c\u0018mZ3\t\r9\f\t\u00011\u0001p\u0011\u001d\ty#!\u0001A\u0002m\fAa]3oI\"9\u00111\u0007\u0001\u0005\u0002\u0005U\u0012\u0001E1xC&$\bK]8qC\u001e\fG/[8o)%Q\u0017qGA\u001f\u0003\u007f\t\u0019\u0005\u0003\u0005\u0002:\u0005E\u0002\u0019AA\u001e\u0003\u001d\u0019XM\u001d<feN\u0004R!!\u0003\u0002\u001a\tDaA\\A\u0019\u0001\u0004y\u0007bBA!\u0003c\u0001\r!`\u0001\na\u0006\u0014H/\u001b;j_:D\u0001\"!\u0012\u00022\u0001\u0007\u0011qI\u0001\bi&lWm\\;u!\u0011\tI%a\u0015\u000e\u0005\u0005-#\u0002BA'\u0003\u001f\n\u0001\u0002Z;sCRLwN\u001c\u0006\u0004\u0003#\u0002\u0012AC2p]\u000e,(O]3oi&!\u0011QKA&\u0005!!UO]1uS>t\u0007bBA-\u0001\u0011\u0005\u00111L\u0001\tg\",H\u000fZ8x]R\t!\u000e")
/* loaded from: input_file:com/datastax/spark/connector/embedded/EmbeddedKafka.class */
public final class EmbeddedKafka implements Embedded {
    private final Map<String, String> kafkaParams;
    private final Properties brokerConf;
    private final EmbeddedZookeeper com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper;
    private final ZkClient client;
    private final KafkaConfig kafkaConfig;
    private final KafkaServer server;
    private Duration com$datastax$spark$connector$embedded$Assertions$$end;
    private final HashSet<String> shutdownDeletePaths;

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration com$datastax$spark$connector$embedded$Assertions$$end() {
        return this.com$datastax$spark$connector$embedded$Assertions$$end;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    @TraitSetter
    public void com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration duration) {
        this.com$datastax$spark$connector$embedded$Assertions$$end = duration;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration now() {
        return Assertions.Cclass.now(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration remainingOrDefault() {
        return Assertions.Cclass.remainingOrDefault(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration remainingOr(FiniteDuration finiteDuration) {
        return Assertions.Cclass.remainingOr(this, finiteDuration);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public void awaitCond(Function0<Object> function0, Duration duration, Duration duration2, String str) {
        Assertions.Cclass.awaitCond(this, function0, duration, duration2, str);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration awaitCond$default$2() {
        return Assertions.Cclass.awaitCond$default$2(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration awaitCond$default$3() {
        return Assertions.Cclass.awaitCond$default$3(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public String awaitCond$default$4() {
        return Assertions.Cclass.awaitCond$default$4(this);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public HashSet<String> shutdownDeletePaths() {
        return this.shutdownDeletePaths;
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void com$datastax$spark$connector$embedded$EmbeddedIO$_setter_$shutdownDeletePaths_$eq(HashSet hashSet) {
        this.shutdownDeletePaths = hashSet;
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public <T, C> T closeAfterUse(C c, Function1<C, T> function1) {
        return (T) EmbeddedIO.Cclass.closeAfterUse(this, c, function1);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void copyTextFileWithVariableSubstitution(InputStream inputStream, OutputStream outputStream, Function1<String, String> function1) {
        EmbeddedIO.Cclass.copyTextFileWithVariableSubstitution(this, inputStream, outputStream, function1);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public File createTempDir() {
        return EmbeddedIO.Cclass.createTempDir(this);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public File mkdir(File file) {
        return EmbeddedIO.Cclass.mkdir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean waitForPortOpen(InetAddress inetAddress, int i, long j) {
        return EmbeddedIO.Cclass.waitForPortOpen(this, inetAddress, i, j);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void registerShutdownDeleteDir(File file) {
        EmbeddedIO.Cclass.registerShutdownDeleteDir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean hasRootAsShutdownDeleteDir(File file) {
        return EmbeddedIO.Cclass.hasRootAsShutdownDeleteDir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void deleteRecursively(File file) {
        EmbeddedIO.Cclass.deleteRecursively(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean isSymlink(File file) {
        return EmbeddedIO.Cclass.isSymlink(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public Seq<File> listFilesSafely(File file) {
        return EmbeddedIO.Cclass.listFilesSafely(this, file);
    }

    public Map<String, String> kafkaParams() {
        return this.kafkaParams;
    }

    private Properties brokerConf() {
        return this.brokerConf;
    }

    public EmbeddedZookeeper com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper() {
        return this.com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper;
    }

    public ZkClient client() {
        return this.client;
    }

    public KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    public KafkaServer server() {
        return this.server;
    }

    public void createTopic(String str) {
        CreateTopicCommand$.MODULE$.createTopic(client(), str, 1, 1, "0");
        awaitPropagation((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{server()})), str, 0, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1000)).millis());
    }

    public void produceAndSendMessage(String str, Map<String, Object> map) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", new StringBuilder().append(kafkaConfig().hostName()).append(":").append(BoxesRunTime.boxToInteger(kafkaConfig().port())).toString());
        properties.put("serializer.class", StringEncoder.class.getName());
        Producer producer = new Producer(new ProducerConfig(properties));
        producer.send(createTestMessage(str, map));
        producer.close();
    }

    private Seq<KeyedMessage<String, String>> createTestMessage(String str, Map<String, Object> map) {
        return ((TraversableOnce) map.withFilter(new EmbeddedKafka$$anonfun$createTestMessage$1(this)).flatMap(new EmbeddedKafka$$anonfun$createTestMessage$2(this, str), Iterable$.MODULE$.canBuildFrom())).toSeq();
    }

    public void awaitPropagation(Seq<KafkaServer> seq, String str, int i, Duration duration) {
        awaitCond(new EmbeddedKafka$$anonfun$awaitPropagation$1(this, seq, str, i), duration, awaitCond$default$3(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Partition [", ", ", "] metadata not propagated after timeout"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(i)})));
    }

    public void shutdown() {
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Shutting down Kafka server."})).s(Nil$.MODULE$));
        server().shutdown();
        server().config().logDirs().foreach(new EmbeddedKafka$$anonfun$shutdown$2(this));
        client().close();
        com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper().shutdown();
        awaitCond(new EmbeddedKafka$$anonfun$shutdown$1(this), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2000)).millis(), awaitCond$default$3(), awaitCond$default$4());
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"ZooKeeper server shut down."})).s(Nil$.MODULE$));
    }

    public EmbeddedKafka() {
        EmbeddedIO.Cclass.$init$(this);
        Assertions.Cclass.$init$(this);
        this.kafkaParams = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("zookeeper.connect"), package$.MODULE$.ZookeeperConnectionString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("group.id"), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test-consumer-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(Random$.MODULE$.nextInt(10000))}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("auto.offset.reset"), "smallest")}));
        this.brokerConf = new Properties();
        brokerConf().put("broker.id", "0");
        brokerConf().put("host.name", "localhost");
        brokerConf().put("port", "9092");
        brokerConf().put("log.dir", createTempDir().getAbsolutePath());
        brokerConf().put("zookeeper.connect", package$.MODULE$.ZookeeperConnectionString());
        brokerConf().put("log.flush.interval.messages", "1");
        brokerConf().put("replica.socket.timeout.ms", "1500");
        this.com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper = new EmbeddedZookeeper(EmbeddedZookeeper$.MODULE$.$lessinit$greater$default$1());
        awaitCond(new EmbeddedKafka$$anonfun$1(this), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2000)).millis(), awaitCond$default$3(), awaitCond$default$4());
        this.client = new ZkClient(package$.MODULE$.ZookeeperConnectionString(), 6000, 6000, ZKStringSerializer$.MODULE$);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"ZooKeeper Client connected."})).s(Nil$.MODULE$));
        this.kafkaConfig = new KafkaConfig(brokerConf());
        this.server = new KafkaServer(kafkaConfig(), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        Thread.sleep(2000L);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Starting the Kafka server at ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{package$.MODULE$.ZookeeperConnectionString()})));
        server().startup();
        Thread.sleep(2000L);
    }
}
