package kafka.consumer;

import java.util.Collections;
import java.util.Properties;
import junit.framework.Assert;
import kafka.common.MessageStreamsExistException;
import kafka.integration.KafkaServerTestHarness;
import kafka.message.CompressionCodec;
import kafka.message.DefaultCompressionCodec$;
import kafka.message.GZIPCompressionCodec$;
import kafka.message.NoCompressionCodec$;
import kafka.producer.Producer;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.serializer.StringEncoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.FixedValuePartitioner;
import kafka.utils.IntEncoder;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Pool;
import kafka.utils.TestUtils$;
import kafka.utils.TestZKUtils$;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.scalatest.junit.JUnit3Suite;
import scala.Function0;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashMap;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ZookeeperConsumerConnectorTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-g\u0001B\u0001\u0003\u0001\u001d\u0011aDW8pW\u0016,\u0007/\u001a:D_:\u001cX/\\3s\u0007>tg.Z2u_J$Vm\u001d;\u000b\u0005\r!\u0011\u0001C2p]N,X.\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001a\u0005\u0003\u0001\u0011IA\u0002CA\u0005\u0011\u001b\u0005Q!BA\u0006\r\u0003\u0015QWO\\5u\u0015\tia\"A\u0005tG\u0006d\u0017\r^3ti*\tq\"A\u0002pe\u001eL!!\u0005\u0006\u0003\u0017)+f.\u001b;4'VLG/\u001a\t\u0003'Yi\u0011\u0001\u0006\u0006\u0003+\u0011\t1\"\u001b8uK\u001e\u0014\u0018\r^5p]&\u0011q\u0003\u0006\u0002\u0017\u0017\u000647.Y*feZ,'\u000fV3ti\"\u000b'O\\3tgB\u0011\u0011\u0004H\u0007\u00025)\u00111\u0004B\u0001\u0006kRLGn]\u0005\u0003;i\u0011q\u0001T8hO&tw\rC\u0003 \u0001\u0011\u0005\u0001%\u0001\u0004=S:LGO\u0010\u000b\u0002CA\u0011!\u0005A\u0007\u0002\u0005!9A\u0005\u0001b\u0001\n\u0003)\u0013A\u0005*fE\u0006d\u0017M\\2f\u0005\u0006\u001c7n\u001c4g\u001bN,\u0012A\n\t\u0003O)j\u0011\u0001\u000b\u0006\u0002S\u0005)1oY1mC&\u00111\u0006\u000b\u0002\u0004\u0013:$\bBB\u0017\u0001A\u0003%a%A\nSK\n\fG.\u00198dK\n\u000b7m[8gM6\u001b\b\u0005C\u00040\u0001\u0001\u0007I\u0011\u0001\u0019\u0002\t\u0011L'o]\u000b\u0002cA\u0011\u0011DM\u0005\u0003gi\u0011\u0001CW&He>,\b\u000fV8qS\u000e$\u0015N]:\t\u000fU\u0002\u0001\u0019!C\u0001m\u0005AA-\u001b:t?\u0012*\u0017\u000f\u0006\u00028uA\u0011q\u0005O\u0005\u0003s!\u0012A!\u00168ji\"91\bNA\u0001\u0002\u0004\t\u0014a\u0001=%c!1Q\b\u0001Q!\nE\nQ\u0001Z5sg\u0002Bqa\u0010\u0001C\u0002\u0013\u0005\u0001)\u0001\t{_>\\W-\u001a9fe\u000e{gN\\3diV\t\u0011\t\u0005\u0002C\u000f6\t1I\u0003\u0002E\u000b\u0006!A.\u00198h\u0015\u00051\u0015\u0001\u00026bm\u0006L!\u0001S\"\u0003\rM#(/\u001b8h\u0011\u0019Q\u0005\u0001)A\u0005\u0003\u0006\t\"p\\8lK\u0016\u0004XM]\"p]:,7\r\u001e\u0011\t\u000f1\u0003!\u0019!C\u0001K\u0005Aa.^7O_\u0012,7\u000f\u0003\u0004O\u0001\u0001\u0006IAJ\u0001\n]Vlgj\u001c3fg\u0002Bq\u0001\u0015\u0001C\u0002\u0013\u0005Q%\u0001\u0005ok6\u0004\u0016M\u001d;t\u0011\u0019\u0011\u0006\u0001)A\u0005M\u0005Ia.^7QCJ$8\u000f\t\u0005\b)\u0002\u0011\r\u0011\"\u0001A\u0003\u0015!x\u000e]5d\u0011\u00191\u0006\u0001)A\u0005\u0003\u00061Ao\u001c9jG\u0002Bq\u0001\u0017\u0001C\u0002\u0013\u0005\u0011,A\u0004d_:4\u0017nZ:\u0016\u0003i\u00032a\u00171c\u001b\u0005a&BA/_\u0003%IW.\\;uC\ndWM\u0003\u0002`Q\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0005d&\u0001\u0002'jgR\u0004\"a\u00194\u000e\u0003\u0011T!!\u001a\u0003\u0002\rM,'O^3s\u0013\t9GMA\u0006LC\u001a\\\u0017mQ8oM&<\u0007BB5\u0001A\u0003%!,\u0001\u0005d_:4\u0017nZ:!\u0011\u001dY\u0007A1A\u0005\u0002\u0001\u000bQa\u001a:pkBDa!\u001c\u0001!\u0002\u0013\t\u0015AB4s_V\u0004\b\u0005C\u0004p\u0001\t\u0007I\u0011\u0001!\u0002\u0013\r|gn];nKJ\u0004\u0004BB9\u0001A\u0003%\u0011)\u0001\u0006d_:\u001cX/\\3sa\u0001Bqa\u001d\u0001C\u0002\u0013\u0005\u0001)A\u0005d_:\u001cX/\\3sc!1Q\u000f\u0001Q\u0001\n\u0005\u000b!bY8ogVlWM]\u0019!\u0011\u001d9\bA1A\u0005\u0002\u0001\u000b\u0011bY8ogVlWM\u001d\u001a\t\re\u0004\u0001\u0015!\u0003B\u0003)\u0019wN\\:v[\u0016\u0014(\u0007\t\u0005\bw\u0002\u0011\r\u0011\"\u0001A\u0003%\u0019wN\\:v[\u0016\u00148\u0007\u0003\u0004~\u0001\u0001\u0006I!Q\u0001\u000bG>t7/^7feN\u0002\u0003bB@\u0001\u0005\u0004%\t!J\u0001\n]6+7o]1hKNDq!a\u0001\u0001A\u0003%a%\u0001\u0006o\u001b\u0016\u001c8/Y4fg\u0002Bq!a\u0002\u0001\t\u0003\nI!A\u0003tKR,\u0006\u000fF\u00018\u0011\u001d\ti\u0001\u0001C!\u0003\u0013\t\u0001\u0002^3be\u0012{wO\u001c\u0005\b\u0003#\u0001A\u0011AA\u0005\u0003%!Xm\u001d;CCNL7\rC\u0004\u0002\u0016\u0001!\t!!\u0003\u0002\u001fQ,7\u000f^\"p[B\u0014Xm]:j_:Dq!!\u0007\u0001\t\u0003\tI!A\u000fuKN$8i\\7qe\u0016\u001c8/[8o'\u0016$8i\u001c8tk6\u0004H/[8o\u0011\u001d\ti\u0002\u0001C\u0001\u0003\u0013\t1\u0003^3ti\u000e{gn];nKJ$UmY8eKJDq!!\t\u0001\t\u0003\tI!A\u0010uKN$H*Z1eKJ\u001cV\r\\3di&|gNR8s!\u0006\u0014H/\u001b;j_:Dq!!\n\u0001\t\u0003\t9#A\u000ftK:$W*Z:tC\u001e,7\u000fV8Ce>\\WM\u001d)beRLG/[8o)1\tI#a\u0013\u0002P\u0005E\u0013QKA-!\u0019\tY#a\u000f\u0002@9!\u0011QFA\u001c\u001d\u0011\ty#!\u000e\u000e\u0005\u0005E\"bAA\u001a\r\u00051AH]8pizJ\u0011!K\u0005\u0004\u0003sA\u0013a\u00029bG.\fw-Z\u0005\u0004C\u0006u\"bAA\u001dQA!\u0011\u0011IA$\u001d\r9\u00131I\u0005\u0004\u0003\u000bB\u0013A\u0002)sK\u0012,g-C\u0002I\u0003\u0013R1!!\u0012)\u0011\u001d\ti%a\tA\u0002\t\faaY8oM&<\u0007b\u0002+\u0002$\u0001\u0007\u0011q\b\u0005\b\u0003'\n\u0019\u00031\u0001'\u0003%\u0001\u0018M\u001d;ji&|g\u000eC\u0004\u0002X\u0005\r\u0002\u0019\u0001\u0014\u0002\u00179,X.T3tg\u0006<Wm\u001d\u0005\u000b\u00037\n\u0019\u0003%AA\u0002\u0005u\u0013aC2p[B\u0014Xm]:j_:\u0004B!a\u0018\u0002f5\u0011\u0011\u0011\r\u0006\u0004\u0003G\"\u0011aB7fgN\fw-Z\u0005\u0005\u0003O\n\tG\u0001\tD_6\u0004(/Z:tS>t7i\u001c3fG\"9\u00111\u000e\u0001\u0005\u0002\u00055\u0014\u0001D:f]\u0012lUm]:bO\u0016\u001cH\u0003DA\u0015\u0003_\n\t(!\u001e\u0002z\u0005m\u0004bBA'\u0003S\u0002\rA\u0019\u0005\b\u0003g\nI\u00071\u0001'\u0003=iWm]:bO\u0016\u001c\b+\u001a:O_\u0012,\u0007\u0002CA<\u0003S\u0002\r!a\u0010\u0002\r!,\u0017\rZ3s\u0011!\tY&!\u001bA\u0002\u0005u\u0003B\u0002)\u0002j\u0001\u0007a\u0005C\u0004\u0002��\u0001!\t!!!\u0002\u0017\u001d,G/T3tg\u0006<Wm\u001d\u000b\u0007\u0003S\t\u0019)a\"\t\u000f\u0005\u0015\u0015Q\u0010a\u0001M\u0005\u0011b.T3tg\u0006<Wm\u001d)feRC'/Z1e\u0011!\tI)! A\u0002\u0005-\u0015a\u0005;pa&\u001cW*Z:tC\u001e,7\u000b\u001e:fC6\u001c\b\u0003CAG\u0003\u001f\u000by$a%\u000e\u0003yK1!!%_\u0005\ri\u0015\r\u001d\t\u0007\u0003W\tY$!&\u0011\u000f\t\n9*a\u0010\u0002@%\u0019\u0011\u0011\u0014\u0002\u0003\u0017-\u000bgm[1TiJ,\u0017-\u001c\u0005\b\u0003;\u0003A\u0011AAP\u0003M9W\r\u001e.L\u0007\"LG\u000e\u001a:f]Z\u000bG.^3t)\u0011\t\t+!,\u0011\r\u00055\u00151UAT\u0013\r\t)K\u0018\u0002\u0004'\u0016\f\bcB\u0014\u0002*\u0006}\u0012qH\u0005\u0004\u0003WC#A\u0002+va2,'\u0007\u0003\u0005\u00020\u0006m\u0005\u0019AA \u0003\u0011\u0001\u0018\r\u001e5\t\u0013\u0005M\u0006!%A\u0005\u0002\u0005U\u0016aJ:f]\u0012lUm]:bO\u0016\u001cHk\u001c\"s_.,'\u000fU1si&$\u0018n\u001c8%I\u00164\u0017-\u001e7uIU*\"!a.+\t\u0005u\u0013\u0011X\u0016\u0003\u0003w\u0003B!!0\u0002H6\u0011\u0011q\u0018\u0006\u0005\u0003\u0003\f\u0019-A\u0005v]\u000eDWmY6fI*\u0019\u0011Q\u0019\u0015\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002J\u0006}&!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0002")
/* loaded from: input_file:kafka/consumer/ZookeeperConsumerConnectorTest.class */
public class ZookeeperConsumerConnectorTest extends JUnit3Suite implements KafkaServerTestHarness, Logging {
    private final int RebalanceBackoffMs;
    private ZKGroupTopicDirs dirs;
    private final String zookeeperConnect;
    private final int numNodes;
    private final int numParts;
    private final String topic;
    private final List<KafkaConfig> configs;
    private final String group;
    private final String consumer0;
    private final String consumer1;
    private final String consumer2;
    private final String consumer3;
    private final int nMessages;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private List<KafkaServer> servers;
    private String brokerList;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private volatile boolean bitmap$0;

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ log4jController$) {
        this.kafka$utils$Logging$$log4jController = log4jController$;
    }

    public void trace(Function0<String> function0) {
        Logging.class.trace(this, function0);
    }

    /* renamed from: trace, reason: collision with other method in class */
    public Object m87trace(Function0<Throwable> function0) {
        return Logging.class.trace(this, function0);
    }

    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.trace(this, function0, function02);
    }

    public void swallowTrace(Function0<BoxedUnit> function0) {
        Logging.class.swallowTrace(this, function0);
    }

    public void debug(Function0<String> function0) {
        Logging.class.debug(this, function0);
    }

    /* renamed from: debug, reason: collision with other method in class */
    public Object m88debug(Function0<Throwable> function0) {
        return Logging.class.debug(this, function0);
    }

    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.debug(this, function0, function02);
    }

    public void swallowDebug(Function0<BoxedUnit> function0) {
        Logging.class.swallowDebug(this, function0);
    }

    public void info(Function0<String> function0) {
        Logging.class.info(this, function0);
    }

    /* renamed from: info, reason: collision with other method in class */
    public Object m89info(Function0<Throwable> function0) {
        return Logging.class.info(this, function0);
    }

    public void info(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.info(this, function0, function02);
    }

    public void swallowInfo(Function0<BoxedUnit> function0) {
        Logging.class.swallowInfo(this, function0);
    }

    public void warn(Function0<String> function0) {
        Logging.class.warn(this, function0);
    }

    /* renamed from: warn, reason: collision with other method in class */
    public Object m90warn(Function0<Throwable> function0) {
        return Logging.class.warn(this, function0);
    }

    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.warn(this, function0, function02);
    }

    public void swallowWarn(Function0<BoxedUnit> function0) {
        Logging.class.swallowWarn(this, function0);
    }

    public void swallow(Function0<BoxedUnit> function0) {
        Logging.class.swallow(this, function0);
    }

    public void error(Function0<String> function0) {
        Logging.class.error(this, function0);
    }

    /* renamed from: error, reason: collision with other method in class */
    public Object m91error(Function0<Throwable> function0) {
        return Logging.class.error(this, function0);
    }

    public void error(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.error(this, function0, function02);
    }

    public void swallowError(Function0<BoxedUnit> function0) {
        Logging.class.swallowError(this, function0);
    }

    public void fatal(Function0<String> function0) {
        Logging.class.fatal(this, function0);
    }

    /* renamed from: fatal, reason: collision with other method in class */
    public Object m92fatal(Function0<Throwable> function0) {
        return Logging.class.fatal(this, function0);
    }

    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.fatal(this, function0, function02);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public String brokerList() {
        return this.brokerList;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void brokerList_$eq(String str) {
        this.brokerList = str;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    public int RebalanceBackoffMs() {
        return this.RebalanceBackoffMs;
    }

    public ZKGroupTopicDirs dirs() {
        return this.dirs;
    }

    public void dirs_$eq(ZKGroupTopicDirs zKGroupTopicDirs) {
        this.dirs = zKGroupTopicDirs;
    }

    public String zookeeperConnect() {
        return this.zookeeperConnect;
    }

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public String group() {
        return this.group;
    }

    public String consumer0() {
        return this.consumer0;
    }

    public String consumer1() {
        return this.consumer1;
    }

    public String consumer2() {
        return this.consumer2;
    }

    public String consumer3() {
        return this.consumer3;
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
        dirs_$eq(new ZKGroupTopicDirs(group(), topic()));
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    public void testBasic() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$2
            private final int consumerTimeoutMs;

            public int consumerTimeoutMs() {
                return this.consumerTimeoutMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer0(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.consumerTimeoutMs = 200;
            }
        }, true);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(new ZookeeperConsumerConnectorTest$$anonfun$testBasic$1(this, zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()))));
        zookeeperConsumerConnector.shutdown();
        List list = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), sendMessagesToBrokerPartition$default$5()).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), sendMessagesToBrokerPartition$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector2 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector2.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), getMessages(nMessages() * 2, createMessageStreams).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector2.commitOffsets(zookeeperConsumerConnector2.commitOffsets$default$1());
        ZookeeperConsumerConnector zookeeperConsumerConnector3 = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$3
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.rebalanceBackoffMs = this.RebalanceBackoffMs();
            }
        }, true);
        Map<String, List<KafkaStream<String, String>>> createMessageStreams2 = zookeeperConsumerConnector3.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list2 = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), sendMessagesToBrokerPartition$default$5()).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), sendMessagesToBrokerPartition$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list2.sorted(Ordering$String$.MODULE$), ((List) getMessages(nMessages(), createMessageStreams).$plus$plus(getMessages(nMessages(), createMessageStreams2), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Seq<Tuple2<String, String>> zKChildrenValues = getZKChildrenValues(dirs().consumerOwnerDir());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer2-0")}));
        Assert.assertEquals(apply, zKChildrenValues);
        ZookeeperConsumerConnector zookeeperConsumerConnector4 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        zookeeperConsumerConnector4.createMessageStreams(new HashMap());
        List list3 = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), sendMessagesToBrokerPartition$default$5()).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), sendMessagesToBrokerPartition$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list3.sorted(Ordering$String$.MODULE$), ((List) getMessages(nMessages(), createMessageStreams).$plus$plus(getMessages(nMessages(), createMessageStreams2), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(apply, getZKChildrenValues(dirs().consumerOwnerDir()));
        try {
            zookeeperConsumerConnector4.createMessageStreams(new HashMap());
            throw fail("Should fail with MessageStreamsExistException");
        } catch (MessageStreamsExistException e) {
            zookeeperConsumerConnector2.shutdown();
            zookeeperConsumerConnector3.shutdown();
            zookeeperConsumerConnector4.shutdown();
            info(new ZookeeperConsumerConnectorTest$$anonfun$testBasic$2(this));
            logger.setLevel(Level.ERROR);
        }
    }

    public void testCompression() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        List list = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), GZIPCompressionCodec$.MODULE$).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), getMessages(nMessages() * 2, createMessageStreams).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.commitOffsets(zookeeperConsumerConnector.commitOffsets$default$1());
        ZookeeperConsumerConnector zookeeperConsumerConnector2 = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$4
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.rebalanceBackoffMs = this.RebalanceBackoffMs();
            }
        }, true);
        Map<String, List<KafkaStream<String, String>>> createMessageStreams2 = zookeeperConsumerConnector2.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list2 = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), GZIPCompressionCodec$.MODULE$).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list2.sorted(Ordering$String$.MODULE$), ((List) getMessages(nMessages(), createMessageStreams).$plus$plus(getMessages(nMessages(), createMessageStreams2), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Seq<Tuple2<String, String>> zKChildrenValues = getZKChildrenValues(dirs().consumerOwnerDir());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer2-0")}));
        Assert.assertEquals(apply, zKChildrenValues);
        ZookeeperConsumerConnector zookeeperConsumerConnector3 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        zookeeperConsumerConnector3.createMessageStreams(new HashMap(), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list3 = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), GZIPCompressionCodec$.MODULE$).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list3.sorted(Ordering$String$.MODULE$), ((List) getMessages(nMessages(), createMessageStreams).$plus$plus(getMessages(nMessages(), createMessageStreams2), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(apply, getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.shutdown();
        zookeeperConsumerConnector2.shutdown();
        zookeeperConsumerConnector3.shutdown();
        info(new ZookeeperConsumerConnectorTest$$anonfun$testCompression$1(this));
        logger.setLevel(Level.ERROR);
    }

    public void testCompressionSetConsumption() {
        List list = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, 200, DefaultCompressionCodec$.MODULE$).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, 200, DefaultCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer0(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), getMessages(400, zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()))).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer0-0"), new Tuple2("1", "group1_consumer0-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.shutdown();
    }

    public void testConsumerDecoder() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        List list = (List) sendMessagesToBrokerPartition((KafkaConfig) configs().head(), topic(), 0, nMessages(), NoCompressionCodec$.MODULE$).$plus$plus(sendMessagesToBrokerPartition((KafkaConfig) configs().last(), topic(), 1, nMessages(), NoCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true);
        Map createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        ObjectRef objectRef = new ObjectRef(Nil$.MODULE$);
        createMessageStreams.withFilter(new ZookeeperConsumerConnectorTest$$anonfun$testConsumerDecoder$1(this)).foreach(new ZookeeperConsumerConnectorTest$$anonfun$testConsumerDecoder$2(this, objectRef));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), ((List) objectRef.elem).sorted(Ordering$String$.MODULE$));
        zookeeperConsumerConnector.shutdown();
        logger.setLevel(Level.ERROR);
    }

    public void testLeaderSelectionForPartition() {
        ZkClient zkClient = new ZkClient(zookeeperConnect(), 6000, 30000, ZKStringSerializer$.MODULE$);
        TestUtils$.MODULE$.createTopic(zkClient, topic(), 1, 1, servers(), TestUtils$.MODULE$.createTopic$default$6());
        List<String> sendMessages = sendMessages((KafkaConfig) configs().head(), nMessages(), "batch1", NoCompressionCodec$.MODULE$, 1);
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Pool topicRegistry = zookeeperConsumerConnector.getTopicRegistry();
        Assert.assertEquals(1, ((TraversableOnce) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$testLeaderSelectionForPartition$1(this), Iterable$.MODULE$.canBuildFrom())).size());
        Assert.assertEquals(topic(), (String) ((IterableLike) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$testLeaderSelectionForPartition$2(this), Iterable$.MODULE$.canBuildFrom())).head());
        Assert.assertEquals(0, ((PartitionTopicInfo) ((IterableLike) ((Tuple2) ((Iterable) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$2(this), Iterable$.MODULE$.canBuildFrom())).head())._2()).head()).partitionId());
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        Assert.assertEquals(sendMessages, getMessages(nMessages(), createMessageStreams));
        zookeeperConsumerConnector.shutdown();
        zkClient.close();
    }

    public List<String> sendMessagesToBrokerPartition(KafkaConfig kafkaConfig, String str, int i, int i2, CompressionCodec compressionCodec) {
        String format = new StringOps(Predef$.MODULE$.augmentString("test-%d-%d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(kafkaConfig.brokerId()), BoxesRunTime.boxToInteger(i)}));
        Properties properties = new Properties();
        properties.put("compression.codec", BoxesRunTime.boxToInteger(compressionCodec.codec()).toString());
        Producer createProducer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs(configs()), StringEncoder.class.getName(), IntEncoder.class.getName(), FixedValuePartitioner.class.getName(), properties);
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i2).map(new ZookeeperConsumerConnectorTest$$anonfun$3(this, kafkaConfig, i, format), IndexedSeq$.MODULE$.canBuildFrom());
        createProducer.send((Seq) indexedSeq.map(new ZookeeperConsumerConnectorTest$$anonfun$sendMessagesToBrokerPartition$1(this, str, i), IndexedSeq$.MODULE$.canBuildFrom()));
        debug(new ZookeeperConsumerConnectorTest$$anonfun$sendMessagesToBrokerPartition$2(this, kafkaConfig, str, i, indexedSeq));
        createProducer.close();
        return indexedSeq.toList();
    }

    public List<String> sendMessages(KafkaConfig kafkaConfig, int i, String str, CompressionCodec compressionCodec, int i2) {
        ObjectRef objectRef = new ObjectRef(Nil$.MODULE$);
        Properties properties = new Properties();
        properties.put("compression.codec", BoxesRunTime.boxToInteger(compressionCodec.codec()).toString());
        Producer createProducer = TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.getBrokerListStrFromConfigs(configs()), StringEncoder.class.getName(), IntEncoder.class.getName(), FixedValuePartitioner.class.getName(), properties);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i2).foreach$mVc$sp(new ZookeeperConsumerConnectorTest$$anonfun$sendMessages$1(this, kafkaConfig, i, str, objectRef, createProducer));
        createProducer.close();
        return (List) objectRef.elem;
    }

    public CompressionCodec sendMessagesToBrokerPartition$default$5() {
        return NoCompressionCodec$.MODULE$;
    }

    public List<String> getMessages(int i, Map<String, List<KafkaStream<String, String>>> map) {
        ObjectRef objectRef = new ObjectRef(Nil$.MODULE$);
        map.withFilter(new ZookeeperConsumerConnectorTest$$anonfun$getMessages$1(this)).foreach(new ZookeeperConsumerConnectorTest$$anonfun$getMessages$2(this, i, objectRef));
        return ((List) objectRef.elem).reverse();
    }

    public Seq<Tuple2<String, String>> getZKChildrenValues(String str) {
        java.util.List children = zkClient().getChildren(str);
        Collections.sort(children);
        return (Seq) JavaConversions$.MODULE$.asScalaBuffer(children).toSeq().map(new ZookeeperConsumerConnectorTest$$anonfun$getZKChildrenValues$1(this, str), Seq$.MODULE$.canBuildFrom());
    }

    public ZookeeperConsumerConnectorTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        KafkaServerTestHarness.Cclass.$init$(this);
        Logging.class.$init$(this);
        this.RebalanceBackoffMs = 5000;
        this.dirs = null;
        this.zookeeperConnect = TestZKUtils$.MODULE$.zookeeperConnect();
        this.numNodes = 2;
        this.numParts = 2;
        this.topic = "topic1";
        this.configs = (List) TestUtils$.MODULE$.createBrokerConfigs(numNodes(), TestUtils$.MODULE$.createBrokerConfigs$default$2()).map(new ZookeeperConsumerConnectorTest$$anonfun$1(this), List$.MODULE$.canBuildFrom());
        this.group = "group1";
        this.consumer0 = "consumer0";
        this.consumer1 = "consumer1";
        this.consumer2 = "consumer2";
        this.consumer3 = "consumer3";
        this.nMessages = 2;
    }
}
