/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.zk.AdminZkClient;
import kafka.zk.AdminZkClient$;
import kafka.zk.KafkaZkClient;
import kafka.zk.KafkaZkClient$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.Time;
import org.apache.spark.util.ShutdownHookManager$;
import org.apache.spark.util.Utils$;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0005\t}h!\u0002,X\u0001]\u000b\u0007\"\u00028\u0001\t\u0003\u0001\bbB:\u0001\u0005\u0004%I\u0001\u001e\u0005\b\u0003\u0003\u0001\u0001\u0015!\u0003v\u0011!\t\u0019\u0001\u0001b\u0001\n\u0013!\bbBA\u0003\u0001\u0001\u0006I!\u001e\u0005\n\u0003\u000f\u0001\u0001\u0019!C\u0005\u0003\u0013A\u0011\"!\u0005\u0001\u0001\u0004%I!a\u0005\t\u0011\u0005}\u0001\u0001)Q\u0005\u0003\u0017A\u0011\"!\t\u0001\u0005\u0004%I!!\u0003\t\u0011\u0005\r\u0002\u0001)A\u0005\u0003\u0017A\u0011\"!\n\u0001\u0005\u0004%I!!\u0003\t\u0011\u0005\u001d\u0002\u0001)A\u0005\u0003\u0017A1\"!\u000b\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002,!Y\u0011\u0011\u0014\u0001A\u0002\u0003\u0007I\u0011BAN\u0011-\t9\u0007\u0001a\u0001\u0002\u0003\u0006K!!\f\t\u0017\u0005}\u0005\u00011AA\u0002\u0013%\u0011\u0011\u0015\u0005\f\u0003g\u0003\u0001\u0019!a\u0001\n\u0013\t)\fC\u0006\u0002:\u0002\u0001\r\u0011!Q!\n\u0005\r\u0006bCA^\u0001\u0001\u0007\t\u0019!C\u0005\u0003{C1\"!2\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002H\"Y\u00111\u001a\u0001A\u0002\u0003\u0005\u000b\u0015BA`\u0011!\ti\r\u0001b\u0001\n\u0013!\bbBAh\u0001\u0001\u0006I!\u001e\u0005\n\u0003#\u0004\u0001\u0019!C\u0005\u0003\u0013A\u0011\"a5\u0001\u0001\u0004%I!!6\t\u0011\u0005e\u0007\u0001)Q\u0005\u0003\u0017A1\"a7\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002^\"Y\u0011\u0011\u001e\u0001A\u0002\u0003\u0007I\u0011BAv\u0011-\ty\u000f\u0001a\u0001\u0002\u0003\u0006K!a8\t\u0017\u0005}\u0003\u00011AA\u0002\u0013%\u0011\u0011\u001f\u0005\f\u0003s\u0004\u0001\u0019!a\u0001\n\u0013\tY\u0010C\u0006\u0002\u0000\u0002\u0001\r\u0011!Q!\n\u0005M\bb\u0003B\u0001\u0001\u0001\u0007\t\u0019!C\u0005\u0005\u0007A1B!\u0006\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\u0018!Y!1\u0004\u0001A\u0002\u0003\u0005\u000b\u0015\u0002B\u0003\u0011%\u0011i\u0002\u0001a\u0001\n\u0013\u0011y\u0002C\u0005\u0003(\u0001\u0001\r\u0011\"\u0003\u0003*!A!Q\u0006\u0001!B\u0013\u0011\t\u0003C\u0005\u00030\u0001\u0001\r\u0011\"\u0003\u0003 !I!\u0011\u0007\u0001A\u0002\u0013%!1\u0007\u0005\t\u0005o\u0001\u0001\u0015)\u0003\u0003\"!I!\u0011\b\u0001A\u0002\u0013%!1\b\u0005\n\u0005{\u0001\u0001\u0019!C\u0005\u0005\u007fAqAa\u0011\u0001A\u0003&!\r\u0003\u0004\u0003F\u0001!\t\u0001\u001e\u0005\u0007\u0005\u000f\u0002A\u0011\u0001;\t\u000f\t%\u0003\u0001\"\u0001\u0002\"\"9!1\n\u0001\u0005\u0002\u0005u\u0006b\u0002B'\u0001\u0011%\u0011q\u0013\u0005\b\u0005\u001f\u0002A\u0011BAL\u0011\u001d\u0011\t\u0006\u0001C\u0001\u0003/CqAa\u0015\u0001\t\u0003\t9\nC\u0004\u0003V\u0001!\tAa\u0016\t\u000f\tU\u0003\u0001\"\u0001\u0003r!9!Q\u000b\u0001\u0005\u0002\t]\u0004b\u0002B>\u0001\u0011\u0005!Q\u0010\u0005\b\u0005w\u0002A\u0011\u0001BI\u0011\u001d\u0011Y\b\u0001C\u0001\u00057CqAa\u001f\u0001\t\u0003\u0011I\u000bC\u0005\u00034\u0002\u0011\r\u0011\"\u0001\u0002~!A!Q\u0017\u0001!\u0002\u0013\t\t\bC\u0004\u00038\u0002!IA!/\t\u000f\tm\u0006\u0001\"\u0003\u0003:\"9!Q\u0018\u0001\u0005\u0002\t}\u0006b\u0002B{\u0001\u0011%!q\u001f\u0004\u0007\u0003c\u0001A!a\r\t\u0013\u0005U\"I!b\u0001\n\u0003!\b\"CA\u001c\u0005\n\u0005\t\u0015!\u0003v\u0011\u0019q'\t\"\u0001\u0002:!I\u0011Q\b\"C\u0002\u0013\u0005\u0011q\b\u0005\t\u0003#\u0012\u0005\u0015!\u0003\u0002B!I\u00111\u000b\"C\u0002\u0013\u0005\u0011q\b\u0005\t\u0003+\u0012\u0005\u0015!\u0003\u0002B!I\u0011\u0011\u0006\"C\u0002\u0013\u0005\u0011q\u000b\u0005\t\u0003O\u0012\u0005\u0015!\u0003\u0002Z!a\u0011\u0011\u000e\"\u0011\u0002\u0003\r\t\u0015!\u0003\u0002l!I\u00111\u0010\"C\u0002\u0013\u0005\u0011Q\u0010\u0005\t\u0003\u007f\u0012\u0005\u0015!\u0003\u0002r!I\u0011\u0011\u0011\"C\u0002\u0013\u0005\u0011\u0011\u0002\u0005\t\u0003\u0007\u0013\u0005\u0015!\u0003\u0002\f!I\u0011Q\u0011\"C\u0002\u0013\u0005\u0011q\u0011\u0005\t\u0003\u001f\u0013\u0005\u0015!\u0003\u0002\n\"I\u0011\u0011\u0013\"C\u0002\u0013\u0005\u0011\u0011\u0002\u0005\t\u0003'\u0013\u0005\u0015!\u0003\u0002\f!9\u0011Q\u0013\"\u0005\u0002\u0005]%AD&bM.\fG+Z:u+RLGn\u001d\u0006\u00031f\u000b\u0001b[1gW\u0006\u0004\u0014\u0007\r\u0006\u00035n\u000b\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005qk\u0016!B:qCJ\\'B\u00010`\u0003\u0019\t\u0007/Y2iK*\t\u0001-A\u0002pe\u001e\u001c2\u0001\u00012i!\t\u0019g-D\u0001e\u0015\u0005)\u0017!B:dC2\f\u0017BA4e\u0005\u0019\te.\u001f*fMB\u0011\u0011\u000e\\\u0007\u0002U*\u00111nW\u0001\tS:$XM\u001d8bY&\u0011QN\u001b\u0002\b\u0019><w-\u001b8h\u0003\u0019a\u0014N\\5u}\r\u0001A#A9\u0011\u0005I\u0004Q\"A,\u0002'1|7-\u00197I_N$h*Y7f\r>\u0014XKU%\u0016\u0003U\u0004\"A^?\u000f\u0005]\\\bC\u0001=e\u001b\u0005I(B\u0001>p\u0003\u0019a$o\\8u}%\u0011A\u0010Z\u0001\u0007!J,G-\u001a4\n\u0005y|(AB*ue&twM\u0003\u0002}I\u0006!Bn\\2bY\"{7\u000f\u001e(b[\u00164uN]+S\u0013\u0002\naA_6I_N$\u0018a\u0002>l\u0011>\u001cH\u000fI\u0001\u0007u.\u0004vN\u001d;\u0016\u0005\u0005-\u0001cA2\u0002\u000e%\u0019\u0011q\u00023\u0003\u0007%sG/\u0001\u0006{WB{'\u000f^0%KF$B!!\u0006\u0002\u001cA\u00191-a\u0006\n\u0007\u0005eAM\u0001\u0003V]&$\b\"CA\u000f\u000f\u0005\u0005\t\u0019AA\u0006\u0003\rAH%M\u0001\bu.\u0004vN\u001d;!\u0003MQ8nQ8o]\u0016\u001cG/[8o)&lWm\\;u\u0003QQ8nQ8o]\u0016\u001cG/[8o)&lWm\\;uA\u0005\u0001\"p[*fgNLwN\u001c+j[\u0016|W\u000f^\u0001\u0012u.\u001cVm]:j_:$\u0016.\\3pkR\u0004\u0013!\u0003>p_.,W\r]3s+\t\ti\u0003E\u0002\u00020\tk\u0011\u0001\u0001\u0002\u0012\u000b6\u0014W\r\u001a3fIj{wn[3fa\u0016\u00148C\u0001\"c\u0003%Q8nQ8o]\u0016\u001cG/\u0001\u0006{W\u000e{gN\\3di\u0002\"B!!\f\u0002<!1\u0011QG#A\u0002U\f1b\u001d8baNDw\u000e\u001e#jeV\u0011\u0011\u0011\t\t\u0005\u0003\u0007\ni%\u0004\u0002\u0002F)!\u0011qIA%\u0003\tIwN\u0003\u0002\u0002L\u0005!!.\u0019<b\u0013\u0011\ty%!\u0012\u0003\t\u0019KG.Z\u0001\rg:\f\u0007o\u001d5pi\u0012K'\u000fI\u0001\u0007Y><G)\u001b:\u0002\u000f1|w\rR5sAU\u0011\u0011\u0011\f\t\u0005\u00037\n\u0019'\u0004\u0002\u0002^)!\u0011qLA1\u0003\u0019\u0019XM\u001d<fe*\u0019\u0011\u0011F/\n\t\u0005\u0015\u0014Q\f\u0002\u00105>|7*Z3qKJ\u001cVM\u001d<fe\u0006Q!p\\8lK\u0016\u0004XM\u001d\u0011\u0002\u0007a$\u0003\bE\u0004d\u0003[\n\t(a\u0003\n\u0007\u0005=DM\u0001\u0004UkBdWM\r\t\u0005\u0003g\nI(\u0004\u0002\u0002v)!\u0011qOA%\u0003\u0011a\u0017M\\4\n\u0007y\f)(\u0001\u0002jaV\u0011\u0011\u0011O\u0001\u0004SB\u0004\u0013\u0001\u00029peR\fQ\u0001]8si\u0002\nqAZ1di>\u0014\u00180\u0006\u0002\u0002\nB!\u00111LAF\u0013\u0011\ti)!\u0018\u0003)9KujU3sm\u0016\u00148I\u001c=o\r\u0006\u001cGo\u001c:z\u0003!1\u0017m\u0019;pef\u0004\u0013AC1diV\fG\u000eU8si\u0006Y\u0011m\u0019;vC2\u0004vN\u001d;!\u0003!\u0019\b.\u001e;e_^tGCAA\u000b\u00035Qxn\\6fKB,'o\u0018\u0013fcR!\u0011QCAO\u0011%\tiBDA\u0001\u0002\u0004\ti#\u0001\u0005{W\u000ec\u0017.\u001a8u+\t\t\u0019\u000b\u0005\u0003\u0002&\u0006=VBAAT\u0015\u0011\tI+a+\u0002\u0005i\\'BAAW\u0003\u0015Y\u0017MZ6b\u0013\u0011\t\t,a*\u0003\u001b-\u000bgm[1[W\u000ec\u0017.\u001a8u\u00031Q8n\u00117jK:$x\fJ3r)\u0011\t)\"a.\t\u0013\u0005u\u0011#!AA\u0002\u0005\r\u0016!\u0003>l\u00072LWM\u001c;!\u0003%\tG-\\\"mS\u0016tG/\u0006\u0002\u0002@B!\u0011QUAa\u0013\u0011\t\u0019-a*\u0003\u001b\u0005#W.\u001b8[W\u000ec\u0017.\u001a8u\u00035\tG-\\\"mS\u0016tGo\u0018\u0013fcR!\u0011QCAe\u0011%\ti\u0002FA\u0001\u0002\u0004\ty,\u0001\u0006bI6\u001cE.[3oi\u0002\n!B\u0019:pW\u0016\u0014\bj\\:u\u0003-\u0011'o\\6fe\"{7\u000f\u001e\u0011\u0002\u0015\t\u0014xn[3s!>\u0014H/\u0001\bce>\\WM\u001d)peR|F%Z9\u0015\t\u0005U\u0011q\u001b\u0005\n\u0003;I\u0012\u0011!a\u0001\u0003\u0017\t1B\u0019:pW\u0016\u0014\bk\u001c:uA\u0005Q!M]8lKJ\u001cuN\u001c4\u0016\u0005\u0005}\u0007\u0003BAq\u0003Kl!!a9\u000b\t\u0005}\u00131V\u0005\u0005\u0003O\f\u0019OA\u0006LC\u001a\\\u0017mQ8oM&<\u0017A\u00042s_.,'oQ8oM~#S-\u001d\u000b\u0005\u0003+\ti\u000fC\u0005\u0002\u001eq\t\t\u00111\u0001\u0002`\u0006Y!M]8lKJ\u001cuN\u001c4!+\t\t\u0019\u0010\u0005\u0003\u0002b\u0006U\u0018\u0002BA|\u0003G\u00141bS1gW\u0006\u001cVM\u001d<fe\u0006Q1/\u001a:wKJ|F%Z9\u0015\t\u0005U\u0011Q \u0005\n\u0003;y\u0012\u0011!a\u0001\u0003g\fqa]3sm\u0016\u0014\b%\u0001\u0005qe>$WoY3s+\t\u0011)\u0001\u0005\u0004\u0003\b\tEQ/^\u0007\u0003\u0005\u0013QAA!\u0001\u0003\f)!!Q\u0002B\b\u0003\u001d\u0019G.[3oiNT1!!,^\u0013\u0011\u0011\u0019B!\u0003\u0003\u001b-\u000bgm[1Qe>$WoY3s\u00031\u0001(o\u001c3vG\u0016\u0014x\fJ3r)\u0011\t)B!\u0007\t\u0013\u0005u!%!AA\u0002\t\u0015\u0011!\u00039s_\u0012,8-\u001a:!\u0003\u001dQ8NU3bIf,\"A!\t\u0011\u0007\r\u0014\u0019#C\u0002\u0003&\u0011\u0014qAQ8pY\u0016\fg.A\u0006{WJ+\u0017\rZ=`I\u0015\fH\u0003BA\u000b\u0005WA\u0011\"!\b&\u0003\u0003\u0005\rA!\t\u0002\u0011i\\'+Z1es\u0002\n1B\u0019:pW\u0016\u0014(+Z1es\u0006y!M]8lKJ\u0014V-\u00193z?\u0012*\u0017\u000f\u0006\u0003\u0002\u0016\tU\u0002\"CA\u000fQ\u0005\u0005\t\u0019\u0001B\u0011\u00031\u0011'o\\6feJ+\u0017\rZ=!\u00031aW-Y6EKR,7\r^8s+\u0005\u0011\u0017\u0001\u00057fC.$U\r^3di>\u0014x\fJ3r)\u0011\t)B!\u0011\t\u0011\u0005u1&!AA\u0002\t\fQ\u0002\\3bW\u0012+G/Z2u_J\u0004\u0013!\u0003>l\u0003\u0012$'/Z:t\u00035\u0011'o\\6fe\u0006#GM]3tg\u0006y!p\\8lK\u0016\u0004XM]\"mS\u0016tG/A\u0006bI6Lgn\u00117jK:$\u0018AF:fiV\u0004X)\u001c2fI\u0012,GMW8pW\u0016,\u0007/\u001a:\u00021M,G/\u001e9F[\n,G\rZ3e\u0017\u000647.Y*feZ,'/A\u0003tKR,\b/\u0001\u0005uK\u0006\u0014Hm\\<o\u0003-\u0019'/Z1uKR{\u0007/[2\u0015\u0011\u0005U!\u0011\fB/\u0005CBaAa\u00176\u0001\u0004)\u0018!\u0002;pa&\u001c\u0007b\u0002B0k\u0001\u0007\u00111B\u0001\u000ba\u0006\u0014H/\u001b;j_:\u001c\bb\u0002B2k\u0001\u0007!QM\u0001\u0007G>tg-[4\u0011\t\t\u001d$QN\u0007\u0003\u0005SRAAa\u001b\u0002J\u0005!Q\u000f^5m\u0013\u0011\u0011yG!\u001b\u0003\u0015A\u0013x\u000e]3si&,7\u000f\u0006\u0004\u0002\u0016\tM$Q\u000f\u0005\u0007\u000572\u0004\u0019A;\t\u000f\t}c\u00071\u0001\u0002\fQ!\u0011Q\u0003B=\u0011\u0019\u0011Yf\u000ea\u0001k\u0006a1/\u001a8e\u001b\u0016\u001c8/Y4fgR1\u0011Q\u0003B@\u0005\u0003CaAa\u00179\u0001\u0004)\bb\u0002BBq\u0001\u0007!QQ\u0001\u000e[\u0016\u001c8/Y4f)>4%/Z9\u0011\u000f\t\u001d$qQ;\u0003\f&!!\u0011\u0012B5\u0005\ri\u0015\r\u001d\t\u0005\u0003g\u0012i)\u0003\u0003\u0003\u0010\u0006U$aB%oi\u0016<WM\u001d\u000b\u0007\u0003+\u0011\u0019J!&\t\r\tm\u0013\b1\u0001v\u0011\u001d\u0011\u0019)\u000fa\u0001\u0005/\u0003bA\u001eBMk\u0006-\u0011b\u0001BE\u007fR1\u0011Q\u0003BO\u0005?CaAa\u0017;\u0001\u0004)\bb\u0002BQu\u0001\u0007!1U\u0001\t[\u0016\u001c8/Y4fgB!1M!*v\u0013\r\u00119\u000b\u001a\u0002\u0006\u0003J\u0014\u0018-\u001f\u000b\u0007\u0003+\u0011YK!,\t\r\tm3\b1\u0001v\u0011\u001d\u0011\tk\u000fa\u0001\u0005_\u0003Ra\u0019BS\u0005c\u0003RaYA7kV\fAB\u0019:pW\u0016\u0014Hj\\4ESJ\fQB\u0019:pW\u0016\u0014Hj\\4ESJ\u0004\u0013a\u00052s_.,'oQ8oM&<WO]1uS>tWC\u0001B3\u0003U\u0001(o\u001c3vG\u0016\u00148i\u001c8gS\u001e,(/\u0019;j_:\f!\"\u001a<f]R,\u0018\r\u001c7z+\u0011\u0011\tM!3\u0015\r\t\r'Q\u001dBy)\u0011\u0011)Ma7\u0011\t\t\u001d'\u0011\u001a\u0007\u0001\t\u001d\u0011Y\r\u0011b\u0001\u0005\u001b\u0014\u0011\u0001V\t\u0005\u0005\u001f\u0014)\u000eE\u0002d\u0005#L1Aa5e\u0005\u001dqu\u000e\u001e5j]\u001e\u00042a\u0019Bl\u0013\r\u0011I\u000e\u001a\u0002\u0004\u0003:L\b\u0002\u0003Bo\u0001\u0012\u0005\rAa8\u0002\t\u0019,hn\u0019\t\u0006G\n\u0005(QY\u0005\u0004\u0005G$'\u0001\u0003\u001fcs:\fW.\u001a \t\u000f\t\u001d\b\t1\u0001\u0003j\u00069A/[7f_V$\b\u0003\u0002Bv\u0005[l\u0011!W\u0005\u0004\u0005_L&\u0001\u0002+j[\u0016DqAa=A\u0001\u0004\u0011I/\u0001\u0005j]R,'O^1m\u0003u9\u0018-\u001b;V]RLG.T3uC\u0012\fG/Y%t!J|\u0007/Y4bi\u0016$GCBA\u000b\u0005s\u0014Y\u0010\u0003\u0004\u0003\\\u0005\u0003\r!\u001e\u0005\b\u0005{\f\u0005\u0019AA\u0006\u0003%\u0001\u0018M\u001d;ji&|g\u000e")
public class KafkaTestUtils
implements Logging {
    private final String localHostNameForURI;
    private final String zkHost;
    private int zkPort;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private EmbeddedZookeeper zookeeper;
    private KafkaZkClient zkClient;
    private AdminZkClient admClient;
    private final String brokerHost;
    private int brokerPort;
    private KafkaConfig brokerConf;
    private KafkaServer server;
    private KafkaProducer<String, String> producer;
    private boolean zkReady;
    private boolean brokerReady;
    private Object leakDetector;
    private final String brokerLogDir;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(HashMap<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private String localHostNameForURI() {
        return this.localHostNameForURI;
    }

    private String zkHost() {
        return this.zkHost;
    }

    private int zkPort() {
        return this.zkPort;
    }

    private void zkPort_$eq(int x$1) {
        this.zkPort = x$1;
    }

    private int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    private int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    private EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    private void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private void zkClient_$eq(KafkaZkClient x$1) {
        this.zkClient = x$1;
    }

    private AdminZkClient admClient() {
        return this.admClient;
    }

    private void admClient_$eq(AdminZkClient x$1) {
        this.admClient = x$1;
    }

    private String brokerHost() {
        return this.brokerHost;
    }

    private int brokerPort() {
        return this.brokerPort;
    }

    private void brokerPort_$eq(int x$1) {
        this.brokerPort = x$1;
    }

    private KafkaConfig brokerConf() {
        return this.brokerConf;
    }

    private void brokerConf_$eq(KafkaConfig x$1) {
        this.brokerConf = x$1;
    }

    private KafkaServer server() {
        return this.server;
    }

    private void server_$eq(KafkaServer x$1) {
        this.server = x$1;
    }

    private KafkaProducer<String, String> producer() {
        return this.producer;
    }

    private void producer_$eq(KafkaProducer<String, String> x$1) {
        this.producer = x$1;
    }

    private boolean zkReady() {
        return this.zkReady;
    }

    private void zkReady_$eq(boolean x$1) {
        this.zkReady = x$1;
    }

    private boolean brokerReady() {
        return this.brokerReady;
    }

    private void brokerReady_$eq(boolean x$1) {
        this.brokerReady = x$1;
    }

    private Object leakDetector() {
        return this.leakDetector;
    }

    private void leakDetector_$eq(Object x$1) {
        this.leakDetector = x$1;
    }

    public String zkAddress() {
        Predef$.MODULE$.assert(this.zkReady(), (Function0 & Serializable)() -> "Zookeeper not setup yet or already torn down, cannot get zookeeper address");
        return this.zkHost() + ":" + this.zkPort();
    }

    public String brokerAddress() {
        Predef$.MODULE$.assert(this.brokerReady(), (Function0 & Serializable)() -> "Kafka not setup yet or already torn down, cannot get broker address");
        return this.brokerHost() + ":" + this.brokerPort();
    }

    public KafkaZkClient zookeeperClient() {
        Predef$.MODULE$.assert(this.zkReady(), (Function0 & Serializable)() -> "Zookeeper not setup yet or already torn down, cannot get zookeeper client");
        return (KafkaZkClient)Option$.MODULE$.apply((Object)this.zkClient()).getOrElse((Function0 & Serializable)() -> {
            throw new IllegalStateException("Zookeeper client is not yet initialized");
        });
    }

    public AdminZkClient adminClient() {
        Predef$.MODULE$.assert(this.zkReady(), (Function0 & Serializable)() -> "Zookeeper not setup yet or already torn down, cannot get zookeeper client");
        return (AdminZkClient)Option$.MODULE$.apply((Object)this.admClient()).getOrElse((Function0 & Serializable)() -> {
            throw new IllegalStateException("Admin client is not yet initialized");
        });
    }

    private void setupEmbeddedZookeeper() {
        this.zookeeper_$eq(new EmbeddedZookeeper(this, this.zkHost() + ":" + this.zkPort()));
        this.zkPort_$eq(this.zookeeper().actualPort());
        this.zkClient_$eq(KafkaZkClient$.MODULE$.apply(this.zkHost() + ":" + this.zkPort(), false, this.zkSessionTimeout(), this.zkConnectionTimeout(), 1, org.apache.kafka.common.utils.Time.SYSTEM, "test", new ZKClientConfig(), KafkaZkClient$.MODULE$.apply$default$9(), KafkaZkClient$.MODULE$.apply$default$10(), KafkaZkClient$.MODULE$.apply$default$11()));
        this.admClient_$eq(new AdminZkClient(this.zkClient(), AdminZkClient$.MODULE$.$lessinit$greater$default$2()));
        this.zkReady_$eq(true);
    }

    private void setupEmbeddedKafkaServer() {
        Predef$.MODULE$.assert(this.zkReady(), (Function0 & Serializable)() -> "Zookeeper should be set up beforehand");
        Utils$.MODULE$.startServiceOnPort(this.brokerPort(), (Function1 & Serializable)port -> KafkaTestUtils.$anonfun$setupEmbeddedKafkaServer$2(this, BoxesRunTime.unboxToInt((Object)port)), new SparkConf(), "KafkaBroker");
        this.brokerReady_$eq(true);
    }

    public void setup() {
        SparkException exception = new SparkException("It was created at: ");
        this.leakDetector_$eq(ShutdownHookManager$.MODULE$.addShutdownHook((Function0)(JFunction0.mcV.sp & Serializable)() -> this.logError((Function0<String>)(Function0 & Serializable)() -> "Found a leak KafkaTestUtils.", (Throwable)exception)));
        this.setupEmbeddedZookeeper();
        this.setupEmbeddedKafkaServer();
    }

    public void teardown() {
        Object object = this.leakDetector() != null ? BoxesRunTime.boxToBoolean((boolean)ShutdownHookManager$.MODULE$.removeShutdownHook(this.leakDetector())) : BoxedUnit.UNIT;
        this.brokerReady_$eq(false);
        this.zkReady_$eq(false);
        if (this.producer() != null) {
            this.producer().close();
            this.producer_$eq(null);
        }
        if (this.server() != null) {
            this.server().shutdown();
            this.server().awaitShutdown();
            this.server_$eq(null);
        }
        this.brokerConf().logDirs().foreach((Function1 & Serializable)f -> {
            KafkaTestUtils.$anonfun$teardown$1(this, f);
            return BoxedUnit.UNIT;
        });
        if (this.zkClient() != null) {
            this.zkClient().close();
            this.zkClient_$eq(null);
        }
        if (this.zookeeper() != null) {
            this.zookeeper().shutdown();
            this.zookeeper_$eq(null);
            return;
        }
    }

    public void createTopic(String topic, int partitions, Properties config) {
        AdminZkClient qual$1 = this.adminClient();
        String x$1 = topic;
        int x$2 = partitions;
        boolean x$3 = true;
        Properties x$4 = config;
        RackAwareMode x$5 = qual$1.createTopic$default$5();
        boolean x$6 = qual$1.createTopic$default$6();
        qual$1.createTopic(x$1, x$2, 1, x$4, x$5, x$6);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), partitions).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)p -> this.waitUntilMetadataIsPropagated(topic, p));
    }

    public void createTopic(String topic, int partitions) {
        this.createTopic(topic, partitions, new Properties());
    }

    public void createTopic(String topic) {
        this.createTopic(topic, 1, new Properties());
    }

    public void sendMessages(String topic, Map<String, Integer> messageToFreq) {
        this.sendMessages(topic, (scala.collection.immutable.Map<String, Object>)((scala.collection.immutable.Map)CollectionConverters$.MODULE$.MapHasAsScala(messageToFreq).asScala().toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()).transform((Function2 & Serializable)(x$7, v) -> BoxesRunTime.boxToInteger((int)v))));
    }

    public void sendMessages(String topic, scala.collection.immutable.Map<String, Object> messageToFreq) {
        String[] messages = (String[])((IterableOnceOps)messageToFreq.flatMap((Function1 & Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String s = (String)tuple2._1();
                int freq = tuple2._2$mcI$sp();
                return (Seq)package$.MODULE$.Seq().fill(freq, (Function0 & Serializable)() -> s);
            }
            throw new MatchError((Object)tuple2);
        })).toArray(ClassTag$.MODULE$.apply(String.class));
        this.sendMessages(topic, messages);
    }

    public void sendMessages(String topic, String[] messages) {
        this.producer_$eq((KafkaProducer<String, String>)new KafkaProducer(this.producerConfiguration()));
        ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps((Object[])messages), (Function1 & Serializable)message -> this.producer().send(new ProducerRecord(topic, message)));
        this.producer().close();
        this.producer_$eq(null);
    }

    public void sendMessages(String topic, Tuple2<String, String>[] messages) {
        this.producer_$eq((KafkaProducer<String, String>)new KafkaProducer(this.producerConfiguration()));
        ArrayOps$.MODULE$.foreach$extension(Predef$.MODULE$.refArrayOps((Object[])messages), (Function1 & Serializable)message -> this.producer().send(new ProducerRecord(topic, message._1(), message._2())));
        this.producer().close();
        this.producer_$eq(null);
    }

    public String brokerLogDir() {
        return this.brokerLogDir;
    }

    private Properties brokerConfiguration() {
        Properties props = new Properties();
        props.put("broker.id", "0");
        props.put("listeners", "PLAINTEXT://" + this.localHostNameForURI() + ":" + this.brokerPort());
        props.put("log.dir", this.brokerLogDir());
        props.put("zookeeper.connect", this.zkAddress());
        props.put("zookeeper.connection.timeout.ms", "60000");
        props.put("log.flush.interval.messages", "1");
        props.put("replica.socket.timeout.ms", "1500");
        props.put("delete.topic.enable", "true");
        props.put("offsets.topic.num.partitions", "1");
        props.put("offsets.topic.replication.factor", "1");
        props.put("group.initial.rebalance.delay.ms", "10");
        return props;
    }

    private Properties producerConfiguration() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerAddress());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("partitioner.class", DefaultPartitioner.class.getName());
        return props;
    }

    public <T> T eventually(Time timeout, Time interval, Function0<T> func) {
        long startTimeNs = System.nanoTime();
        return (T)this.tryAgain$1(1, startTimeNs, timeout, interval, func);
    }

    private void waitUntilMetadataIsPropagated(String topic, int partition) {
        this.eventually(new Time(10000L), new Time(100L), (Function0)(JFunction0.mcV.sp & Serializable)() -> Predef$.MODULE$.assert(this.isPropagated$1(topic, partition), (Function0 & Serializable)() -> "Partition [" + topic + ", " + partition + "] metadata not propagated after timeout"));
    }

    public static final /* synthetic */ Tuple2 $anonfun$setupEmbeddedKafkaServer$2(KafkaTestUtils $this, int port) {
        $this.brokerPort_$eq(port);
        $this.brokerConf_$eq(new KafkaConfig((Map)$this.brokerConfiguration(), false));
        $this.server_$eq(new KafkaServer($this.brokerConf(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4()));
        $this.server().startup();
        $this.brokerPort_$eq($this.server().boundPort(new ListenerName("PLAINTEXT")));
        return new Tuple2((Object)$this.server(), (Object)BoxesRunTime.boxToInteger((int)$this.brokerPort()));
    }

    public static final /* synthetic */ void $anonfun$teardown$1(KafkaTestUtils $this, String f) {
        try {
            Utils$.MODULE$.deleteRecursively(new File(f));
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            if (throwable2 instanceof IOException) {
                IOException iOException = (IOException)throwable2;
                if (Utils$.MODULE$.isWindows()) {
                    $this.logWarning((Function0<String>)(Function0 & Serializable)() -> iOException.getMessage());
                }
            }
            throw throwable;
        }
    }

    private static final Either makeAttempt$1(Function0 func$1) {
        Right right;
        try {
            right = new Right(func$1.apply());
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            if (NonFatal$.MODULE$.apply(throwable2)) {
                right = new Left((Object)throwable2);
            }
            throw throwable;
        }
        return right;
    }

    private final Object tryAgain$1(int attempt, long startTimeNs$1, Time timeout$1, Time interval$1, Function0 func$1) {
        Either either;
        while (true) {
            if ((either = KafkaTestUtils.makeAttempt$1(func$1)) instanceof Right) {
                Right right = (Right)either;
                Object result = right.value();
                return result;
            }
            if (!(either instanceof Left)) break;
            Left left = (Left)either;
            Throwable e = (Throwable)left.value();
            long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs$1);
            if (durationMs >= timeout$1.milliseconds()) {
                throw new TimeoutException(e.getMessage());
            }
            Thread.sleep(interval$1.milliseconds());
            ++attempt;
        }
        throw new MatchError((Object)either);
    }

    private final boolean isPropagated$1(String topic$4, int partition$1) {
        Option option = this.server().dataPlaneRequestProcessor().metadataCache().getPartitionInfo(topic$4, partition$1);
        if (option instanceof Some) {
            Some some = (Some)option;
            UpdateMetadataRequestData.UpdateMetadataPartitionState partitionState = (UpdateMetadataRequestData.UpdateMetadataPartitionState)some.value();
            int leader = partitionState.leader();
            List isr = partitionState.isr();
            return this.zkClient().getLeaderForPartition(new TopicPartition(topic$4, partition$1)).isDefined() && FetchRequest.isValidBrokerId((int)leader) && !isr.isEmpty();
        }
        return false;
    }

    public KafkaTestUtils() {
        Logging.$init$((Logging)this);
        this.localHostNameForURI = Utils$.MODULE$.localHostNameForURI();
        this.zkHost = this.localHostNameForURI();
        this.zkPort = 0;
        this.zkConnectionTimeout = 60000;
        this.zkSessionTimeout = 10000;
        this.brokerHost = this.localHostNameForURI();
        this.brokerPort = 0;
        this.zkReady = false;
        this.brokerReady = false;
        this.leakDetector = null;
        this.brokerLogDir = Utils$.MODULE$.createTempDir().getAbsolutePath();
    }

    private class EmbeddedZookeeper {
        private final String zkConnect;
        private final File snapshotDir;
        private final File logDir;
        private final ZooKeeperServer zookeeper;
        private final /* synthetic */ Tuple2 x$8;
        private final String ip;
        private final int port;
        private final NIOServerCnxnFactory factory;
        private final int actualPort;
        public final /* synthetic */ KafkaTestUtils $outer;

        public String zkConnect() {
            return this.zkConnect;
        }

        public File snapshotDir() {
            return this.snapshotDir;
        }

        public File logDir() {
            return this.logDir;
        }

        public ZooKeeperServer zookeeper() {
            return this.zookeeper;
        }

        public String ip() {
            return this.ip;
        }

        public int port() {
            return this.port;
        }

        public NIOServerCnxnFactory factory() {
            return this.factory;
        }

        public int actualPort() {
            return this.actualPort;
        }

        public void shutdown() {
            this.factory().shutdown();
            try {
                Utils$.MODULE$.deleteRecursively(this.snapshotDir());
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                if (throwable2 instanceof IOException) {
                    IOException iOException = (IOException)throwable2;
                    if (Utils$.MODULE$.isWindows()) {
                        this.org$apache$spark$streaming$kafka010$KafkaTestUtils$EmbeddedZookeeper$$$outer().logWarning((Function0<String>)(Function0 & Serializable)() -> iOException.getMessage());
                    }
                }
                throw throwable;
            }
            try {
                Utils$.MODULE$.deleteRecursively(this.logDir());
            }
            catch (Throwable throwable) {
                Throwable throwable3 = throwable;
                if (throwable3 instanceof IOException) {
                    IOException iOException = (IOException)throwable3;
                    if (Utils$.MODULE$.isWindows()) {
                        this.org$apache$spark$streaming$kafka010$KafkaTestUtils$EmbeddedZookeeper$$$outer().logWarning((Function0<String>)(Function0 & Serializable)() -> iOException.getMessage());
                    }
                }
                throw throwable;
            }
        }

        public /* synthetic */ KafkaTestUtils org$apache$spark$streaming$kafka010$KafkaTestUtils$EmbeddedZookeeper$$$outer() {
            return this.$outer;
        }

        public EmbeddedZookeeper(KafkaTestUtils $outer, String zkConnect) {
            this.zkConnect = zkConnect;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
            this.snapshotDir = Utils$.MODULE$.createTempDir();
            this.logDir = Utils$.MODULE$.createTempDir();
            this.zookeeper = new ZooKeeperServer(this.snapshotDir(), this.logDir(), 500);
            String[] splits = zkConnect.split(":");
            String port = splits[splits.length - 1];
            Tuple2 tuple2 = new Tuple2((Object)zkConnect.substring(0, zkConnect.length() - port.length() - 1), (Object)BoxesRunTime.boxToInteger((int)StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(port))));
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String ip = (String)tuple2._1();
            int port2 = tuple2._2$mcI$sp();
            this.x$8 = new Tuple2((Object)ip, (Object)BoxesRunTime.boxToInteger((int)port2));
            this.ip = (String)this.x$8._1();
            this.port = this.x$8._2$mcI$sp();
            this.factory = new NIOServerCnxnFactory();
            this.factory().configure(new InetSocketAddress(this.ip(), this.port()), 16);
            this.factory().startup(this.zookeeper());
            this.actualPort = this.factory().getLocalPort();
        }
    }
}

