package com.datastax.spark.connector.embedded;

import com.datastax.spark.connector.embedded.Assertions;
import com.datastax.spark.connector.embedded.EmbeddedIO;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;
import scala.util.Random$;
import scala.util.Try$;

/* compiled from: EmbeddedKafka.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mc\u0001B\u0001\u0003\u00055\u0011Q\"R7cK\u0012$W\rZ&bM.\f'BA\u0002\u0005\u0003!)WNY3eI\u0016$'BA\u0003\u0007\u0003%\u0019wN\u001c8fGR|'O\u0003\u0002\b\u0011\u0005)1\u000f]1sW*\u0011\u0011BC\u0001\tI\u0006$\u0018m\u001d;bq*\t1\"A\u0002d_6\u001c\u0001aE\u0002\u0001\u001dQ\u0001\"a\u0004\n\u000e\u0003AQ\u0011!E\u0001\u0006g\u000e\fG.Y\u0005\u0003'A\u0011a!\u00118z%\u00164\u0007CA\u000b\u0017\u001b\u0005\u0011\u0011BA\f\u0003\u0005!)UNY3eI\u0016$\u0007\u0002C\r\u0001\u0005\u000b\u0007I\u0011\u0001\u000e\u0002\u0017-\fgm[1QCJ\fWn]\u000b\u00027A!Ad\b\u0012#\u001d\tyQ$\u0003\u0002\u001f!\u00051\u0001K]3eK\u001aL!\u0001I\u0011\u0003\u00075\u000b\u0007O\u0003\u0002\u001f!A\u0011AdI\u0005\u0003I\u0005\u0012aa\u0015;sS:<\u0007\u0002\u0003\u0014\u0001\u0005\u0003\u0005\u000b\u0011B\u000e\u0002\u0019-\fgm[1QCJ\fWn\u001d\u0011\t\u000b!\u0002A\u0011A\u0015\u0002\rqJg.\u001b;?)\tQ3\u0006\u0005\u0002\u0016\u0001!)\u0011d\na\u00017!)\u0001\u0006\u0001C\u0001[Q\u0011!F\f\u0005\u0006_1\u0002\rAI\u0001\bOJ|W\u000f]%e\u0011\u0015A\u0003\u0001\"\u00012)\u0005Q\u0003bB\u001a\u0001\u0005\u0004%I\u0001N\u0001\nu>|7.Z3qKJ,\u0012!\u000e\t\u0003+YJ!a\u000e\u0002\u0003#\u0015k'-\u001a3eK\u0012Tvn\\6fKB,'\u000f\u0003\u0004:\u0001\u0001\u0006I!N\u0001\u000bu>|7.Z3qKJ\u0004\u0003bB\u001e\u0001\u0005\u0004%\t\u0001P\u0001\fW\u000647.Y\"p]\u001aLw-F\u0001>!\tq4)D\u0001@\u0015\t\u0001\u0015)\u0001\u0004tKJ4XM\u001d\u0006\u0002\u0005\u0006)1.\u00194lC&\u0011Ai\u0010\u0002\f\u0017\u000647.Y\"p]\u001aLw\r\u0003\u0004G\u0001\u0001\u0006I!P\u0001\rW\u000647.Y\"p]\u001aLw\r\t\u0005\b\u0001\u0002\u0011\r\u0011\"\u0001I+\u0005I\u0005C\u0001 K\u0013\tYuHA\u0006LC\u001a\\\u0017mU3sm\u0016\u0014\bBB'\u0001A\u0003%\u0011*A\u0004tKJ4XM\u001d\u0011\t\u000f=\u0003!\u0019!C\u0001!\u0006q\u0001O]8ek\u000e,'oQ8oM&<W#A)\u0011\u0005I+V\"A*\u000b\u0005Q\u000b\u0015\u0001\u00039s_\u0012,8-\u001a:\n\u0005Y\u001b&A\u0004)s_\u0012,8-\u001a:D_:4\u0017n\u001a\u0005\u00071\u0002\u0001\u000b\u0011B)\u0002\u001fA\u0014x\u000eZ;dKJ\u001cuN\u001c4jO\u0002Bq\u0001\u0016\u0001C\u0002\u0013\u0005!,F\u0001\\!\u0011\u0011FL\t\u0012\n\u0005u\u001b&\u0001\u0003)s_\u0012,8-\u001a:\t\r}\u0003\u0001\u0015!\u0003\\\u0003%\u0001(o\u001c3vG\u0016\u0014\b\u0005C\u0003b\u0001\u0011\u0005!-A\u0006de\u0016\fG/\u001a+pa&\u001cG\u0003B2gQ6\u0004\"a\u00043\n\u0005\u0015\u0004\"\u0001B+oSRDQa\u001a1A\u0002\t\nQ\u0001^8qS\u000eDq!\u001b1\u0011\u0002\u0003\u0007!.A\u0007ok6\u0004\u0016M\u001d;ji&|gn\u001d\t\u0003\u001f-L!\u0001\u001c\t\u0003\u0007%sG\u000fC\u0004oAB\u0005\t\u0019\u00016\u0002#I,\u0007\u000f\\5dCRLwN\u001c$bGR|'\u000fC\u0003q\u0001\u0011\u0005\u0011/A\u000bqe>$WoY3B]\u0012\u001cVM\u001c3NKN\u001c\u0018mZ3\u0015\u0007\r\u00148\u000fC\u0003h_\u0002\u0007!\u0005C\u0003u_\u0002\u0007Q/\u0001\u0003tK:$\b\u0003\u0002\u000f E)DQa\u001e\u0001\u0005\na\f\u0011c\u0019:fCR,G+Z:u\u001b\u0016\u001c8/Y4f)\u0015I\u0018\u0011CA\n!\u0015Q\u0018QAA\u0006\u001d\rY\u0018\u0011\u0001\b\u0003y~l\u0011! \u0006\u0003}2\ta\u0001\u0010:p_Rt\u0014\"A\t\n\u0007\u0005\r\u0001#A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\u001d\u0011\u0011\u0002\u0002\u0004'\u0016\f(bAA\u0002!A)!+!\u0004#E%\u0019\u0011qB*\u0003\u0019-+\u00170\u001a3NKN\u001c\u0018mZ3\t\u000b\u001d4\b\u0019\u0001\u0012\t\r\u0005Ua\u000f1\u0001v\u0003\u0011\u0019XM\u001c3\t\u000f\u0005e\u0001\u0001\"\u0001\u0002\u001c\u0005\u0001\u0012m^1jiB\u0013x\u000e]1hCRLwN\u001c\u000b\bG\u0006u\u0011qDA\u0012\u0011\u00199\u0017q\u0003a\u0001E!9\u0011\u0011EA\f\u0001\u0004Q\u0017!\u00039beRLG/[8o\u0011!\t)#a\u0006A\u0002\u0005\u001d\u0012a\u0002;j[\u0016|W\u000f\u001e\t\u0005\u0003S\t\u0019$\u0004\u0002\u0002,)!\u0011QFA\u0018\u0003!!WO]1uS>t'bAA\u0019!\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\t\u0005U\u00121\u0006\u0002\t\tV\u0014\u0018\r^5p]\"9\u0011\u0011\b\u0001\u0005\u0002\u0005m\u0012\u0001C:ikR$wn\u001e8\u0015\u0003\rD\u0011\"a\u0010\u0001#\u0003%\t!!\u0011\u0002+\r\u0014X-\u0019;f)>\u0004\u0018n\u0019\u0013eK\u001a\fW\u000f\u001c;%eU\u0011\u00111\t\u0016\u0004U\u0006\u00153FAA$!\u0011\tI%a\u0015\u000e\u0005\u0005-#\u0002BA'\u0003\u001f\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005E\u0003#\u0001\u0006b]:|G/\u0019;j_:LA!!\u0016\u0002L\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0013\u0005e\u0003!%A\u0005\u0002\u0005\u0005\u0013!F2sK\u0006$X\rV8qS\u000e$C-\u001a4bk2$He\r")
/* loaded from: input_file:com/datastax/spark/connector/embedded/EmbeddedKafka.class */
public final class EmbeddedKafka implements Embedded {
    private final Map<String, String> kafkaParams;
    private final EmbeddedZookeeper com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper;
    private final KafkaConfig kafkaConfig;
    private final KafkaServer server;
    private final ProducerConfig producerConfig;
    private final Producer<String, String> producer;
    private Duration com$datastax$spark$connector$embedded$Assertions$$end;
    private final HashSet<String> shutdownDeletePaths;

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration com$datastax$spark$connector$embedded$Assertions$$end() {
        return this.com$datastax$spark$connector$embedded$Assertions$$end;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    @TraitSetter
    public void com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration duration) {
        this.com$datastax$spark$connector$embedded$Assertions$$end = duration;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration now() {
        return Assertions.Cclass.now(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration remainingOrDefault() {
        return Assertions.Cclass.remainingOrDefault(this);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public FiniteDuration remainingOr(FiniteDuration finiteDuration) {
        return Assertions.Cclass.remainingOr(this, finiteDuration);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public void awaitCond(Function0<Object> function0, Duration duration, Duration duration2, String str) {
        Assertions.Cclass.awaitCond(this, function0, duration, duration2, str);
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration awaitCond$default$2() {
        Duration seconds;
        seconds = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(3)).seconds();
        return seconds;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public Duration awaitCond$default$3() {
        Duration millis;
        millis = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(100)).millis();
        return millis;
    }

    @Override // com.datastax.spark.connector.embedded.Assertions
    public String awaitCond$default$4() {
        return Assertions.Cclass.awaitCond$default$4(this);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public HashSet<String> shutdownDeletePaths() {
        return this.shutdownDeletePaths;
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void com$datastax$spark$connector$embedded$EmbeddedIO$_setter_$shutdownDeletePaths_$eq(HashSet hashSet) {
        this.shutdownDeletePaths = hashSet;
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public <T, C> T closeAfterUse(C c, Function1<C, T> function1) {
        return (T) EmbeddedIO.Cclass.closeAfterUse(this, c, function1);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void copyTextFileWithVariableSubstitution(InputStream inputStream, OutputStream outputStream, Function1<String, String> function1) {
        EmbeddedIO.Cclass.copyTextFileWithVariableSubstitution(this, inputStream, outputStream, function1);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public File createTempDir() {
        return EmbeddedIO.Cclass.createTempDir(this);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public File mkdir(File file) {
        return EmbeddedIO.Cclass.mkdir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean waitForPortOpen(InetAddress inetAddress, int i, long j) {
        return EmbeddedIO.Cclass.waitForPortOpen(this, inetAddress, i, j);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void registerShutdownDeleteDir(File file) {
        EmbeddedIO.Cclass.registerShutdownDeleteDir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean hasRootAsShutdownDeleteDir(File file) {
        return EmbeddedIO.Cclass.hasRootAsShutdownDeleteDir(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public void deleteRecursively(File file) {
        EmbeddedIO.Cclass.deleteRecursively(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public boolean isSymlink(File file) {
        return EmbeddedIO.Cclass.isSymlink(this, file);
    }

    @Override // com.datastax.spark.connector.embedded.EmbeddedIO
    public Seq<File> listFilesSafely(File file) {
        return EmbeddedIO.Cclass.listFilesSafely(this, file);
    }

    public Map<String, String> kafkaParams() {
        return this.kafkaParams;
    }

    public EmbeddedZookeeper com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper() {
        return this.com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper;
    }

    public KafkaConfig kafkaConfig() {
        return this.kafkaConfig;
    }

    public KafkaServer server() {
        return this.server;
    }

    public ProducerConfig producerConfig() {
        return this.producerConfig;
    }

    public Producer<String, String> producer() {
        return this.producer;
    }

    public void createTopic(String str, int i, int i2) {
        AdminUtils$.MODULE$.createTopic(server().zkClient(), str, i, i2, AdminUtils$.MODULE$.createTopic$default$5());
        awaitPropagation(str, 0, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2000)).millis());
    }

    public int createTopic$default$2() {
        return 1;
    }

    public int createTopic$default$3() {
        return 1;
    }

    public void produceAndSendMessage(String str, Map<String, Object> map) {
        producer().send(createTestMessage(str, map));
    }

    private Seq<KeyedMessage<String, String>> createTestMessage(String str, Map<String, Object> map) {
        return ((TraversableOnce) map.withFilter(new EmbeddedKafka$$anonfun$createTestMessage$1(this)).flatMap(new EmbeddedKafka$$anonfun$createTestMessage$2(this, str), Iterable$.MODULE$.canBuildFrom())).toSeq();
    }

    public void awaitPropagation(String str, int i, Duration duration) {
        awaitCond(new EmbeddedKafka$$anonfun$awaitPropagation$1(this, str, i), duration, awaitCond$default$3(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Partition [", ", ", "] metadata not propagated after timeout"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(i)})));
    }

    public void shutdown() {
        try {
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Shutting down Kafka server."})).s(Nil$.MODULE$));
            Option$.MODULE$.apply(producer()).map(new EmbeddedKafka$$anonfun$shutdown$4(this));
            Try$.MODULE$.apply(new EmbeddedKafka$$anonfun$shutdown$1(this));
            Try$.MODULE$.apply(new EmbeddedKafka$$anonfun$shutdown$2(this));
            server().awaitShutdown();
            server().config().logDirs().foreach(new EmbeddedKafka$$anonfun$shutdown$5(this));
            com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper().shutdown();
            awaitCond(new EmbeddedKafka$$anonfun$shutdown$3(this), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2000)).millis(), awaitCond$default$3(), awaitCond$default$4());
            Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"ZooKeeper server shut down."})).s(Nil$.MODULE$));
            Thread.sleep(2000L);
        } catch (IOException e) {
        }
    }

    public EmbeddedKafka(Map<String, String> map) {
        this.kafkaParams = map;
        EmbeddedIO.Cclass.$init$(this);
        com$datastax$spark$connector$embedded$Assertions$$end_$eq(Duration$.MODULE$.Undefined());
        this.com$datastax$spark$connector$embedded$EmbeddedKafka$$zookeeper = new EmbeddedZookeeper(EmbeddedZookeeper$.MODULE$.$lessinit$greater$default$1());
        awaitCond(new EmbeddedKafka$$anonfun$1(this), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2000)).millis(), awaitCond$default$3(), awaitCond$default$4());
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("broker.id"), "0"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("host.name"), "127.0.0.1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("port"), "9092"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("advertised.host.name"), "127.0.0.1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("advertised.port"), "9092"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("log.dir"), createTempDir().getAbsolutePath()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connect"), package$.MODULE$.ZookeeperConnectionString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("replica.high.watermark.checkpoint.interval.ms"), "5000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("log.flush.interval.messages"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("replica.socket.timeout.ms"), "500"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("controlled.shutdown.enable"), "false"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.leader.rebalance.enable"), "false")}));
        Properties properties = new Properties();
        properties.putAll(JavaConversions$.MODULE$.mapAsJavaMap(apply));
        this.kafkaConfig = new KafkaConfig(properties);
        this.server = new KafkaServer(kafkaConfig(), KafkaServer$.MODULE$.$lessinit$greater$default$2());
        Thread.sleep(2000L);
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Starting the Kafka server at ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{package$.MODULE$.ZookeeperConnectionString()})));
        server().startup();
        Thread.sleep(2000L);
        Properties properties2 = new Properties();
        properties2.put("metadata.broker.list", new StringBuilder().append(kafkaConfig().hostName()).append(":").append(BoxesRunTime.boxToInteger(kafkaConfig().port())).toString());
        properties2.put("serializer.class", StringEncoder.class.getName());
        this.producerConfig = new ProducerConfig(properties2);
        this.producer = new Producer<>(producerConfig());
    }

    public EmbeddedKafka(String str) {
        this((Map<String, String>) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connect"), package$.MODULE$.ZookeeperConnectionString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.offset.reset"), "smallest")})));
    }

    public EmbeddedKafka() {
        this(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"consumer-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(Random$.MODULE$.nextInt(10000))})));
    }
}
