package kafka.integration;

import java.nio.ByteBuffer;
import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.CreateTopicCommand$;
import kafka.api.FetchRequest;
import kafka.api.FetchRequest$;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.cluster.Replica;
import kafka.common.OffsetOutOfRangeException;
import kafka.common.UnknownTopicOrPartitionException;
import kafka.consumer.SimpleConsumer;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.ProducerConsumerTestHarness;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.server.ReplicaManager;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.scalatest.junit.JUnit3Suite;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;

@ScalaSignature(bytes = "\u0006\u0001\u0005-b\u0001B\u0001\u0003\u0001\u001d\u0011\u0001\u0003\u0015:j[&$\u0018N^3Ba&$Vm\u001d;\u000b\u0005\r!\u0011aC5oi\u0016<'/\u0019;j_:T\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0011\u0001\u0001B\u0005\f\u0011\u0005%\u0001R\"\u0001\u0006\u000b\u0005-a\u0011!\u00026v]&$(BA\u0007\u000f\u0003%\u00198-\u00197bi\u0016\u001cHOC\u0001\u0010\u0003\ry'oZ\u0005\u0003#)\u00111BS+oSR\u001c4+^5uKB\u00111\u0003F\u0007\u0002\u0005%\u0011QC\u0001\u0002\u001c!J|G-^2fe\u000e{gn];nKJ$Vm\u001d;ICJtWm]:\u0011\u0005]QR\"\u0001\r\u000b\u0005e!\u0011A\u0001>l\u0013\tY\u0002D\u0001\u000b[_>\\U-\u001a9feR+7\u000f\u001e%be:,7o\u001d\u0005\u0006;\u0001!\tAH\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003}\u0001\"a\u0005\u0001\t\u000f\u0005\u0002!\u0019!C\u0001E\u0005!\u0001o\u001c:u+\u0005\u0019\u0003C\u0001\u0013(\u001b\u0005)#\"\u0001\u0014\u0002\u000bM\u001c\u0017\r\\1\n\u0005!*#aA%oi\"1!\u0006\u0001Q\u0001\n\r\nQ\u0001]8si\u0002Bq\u0001\f\u0001C\u0002\u0013\u0005Q&A\u0003qe>\u00048/F\u0001/!\tyC'D\u00011\u0015\t\t$'\u0001\u0003vi&d'\"A\u001a\u0002\t)\fg/Y\u0005\u0003kA\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u00199\u0004\u0001)A\u0005]\u00051\u0001O]8qg\u0002Bq!\u000f\u0001C\u0002\u0013\u0005!(\u0001\u0004d_:4\u0017nZ\u000b\u0002wA\u0011AhP\u0007\u0002{)\u0011a\bB\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005\u0001k$aC&bM.\f7i\u001c8gS\u001eDaA\u0011\u0001!\u0002\u0013Y\u0014aB2p]\u001aLw\r\t\u0005\b\t\u0002\u0011\r\u0011\"\u0001F\u0003\u001d\u0019wN\u001c4jON,\u0012A\u0012\t\u0004\u000f2[T\"\u0001%\u000b\u0005%S\u0015!C5n[V$\u0018M\u00197f\u0015\tYU%\u0001\u0006d_2dWm\u0019;j_:L!!\u0014%\u0003\t1K7\u000f\u001e\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002$\u0002\u0011\r|gNZ5hg\u0002Bq!\u0015\u0001C\u0002\u0013\u0005!+\u0001\u000bsKF,Xm\u001d;IC:$G.\u001a:M_\u001e<WM]\u000b\u0002'B\u0011A+W\u0007\u0002+*\u0011akV\u0001\u0006Y><GG\u001b\u0006\u00031:\ta!\u00199bG\",\u0017B\u0001.V\u0005\u0019aunZ4fe\"1A\f\u0001Q\u0001\nM\u000bQC]3rk\u0016\u001cH\u000fS1oI2,'\u000fT8hO\u0016\u0014\b\u0005C\u0003_\u0001\u0011\u0005s,A\u0003tKR,\u0006\u000fF\u0001a!\t!\u0013-\u0003\u0002cK\t!QK\\5u\u0011\u0015!\u0007\u0001\"\u0011`\u0003!!X-\u0019:E_^t\u0007\"\u00024\u0001\t\u0003y\u0016\u0001\n;fgR4U\r^2i%\u0016\fX/Z:u\u0007\u0006t\u0007K]8qKJd\u0017pU3sS\u0006d\u0017N_3\t\u000b!\u0004A\u0011A0\u0002+Q,7\u000f^#naRLh)\u001a;dQJ+\u0017/^3ti\")!\u000e\u0001C\u0001?\u0006\u0011C/Z:u\t\u00164\u0017-\u001e7u\u000b:\u001cw\u000eZ3s!J|G-^2fe\u0006sGMR3uG\"DQ\u0001\u001c\u0001\u0005\u0002}\u000b\u0011\u0007^3ti\u0012+g-Y;mi\u0016s7m\u001c3feB\u0013x\u000eZ;dKJ\fe\u000e\u001a$fi\u000eDw+\u001b;i\u0007>l\u0007O]3tg&|g\u000eC\u0003o\u0001\u0011\u0005q,\u0001\ruKN$\bK]8ek\u000e,\u0017I\u001c3Nk2$\u0018NR3uG\"DQ\u0001\u001d\u0001\u0005\u0002}\u000bq\u0005^3tiB\u0013x\u000eZ;dK\u0006sG-T;mi&4U\r^2i/&$\bnQ8naJ,7o]5p]\")!\u000f\u0001C\u0001?\u0006\u0001B/Z:u\u001bVdG/\u001b)s_\u0012,8-\u001a\u0005\u0006i\u0002!\taX\u0001 i\u0016\u001cH/T;mi&\u0004&o\u001c3vG\u0016<\u0016\u000e\u001e5D_6\u0004(/Z:tS>t\u0007\"\u0002<\u0001\t\u0003y\u0016A\u0006;fgR\u001cuN\\:v[\u0016\u0014X)\u001c9usR{\u0007/[2\t\u000ba\u0004A\u0011A0\u00029Q,7\u000f\u001e)ja\u0016d\u0017N\\3e!J|G-^2f%\u0016\fX/Z:ug\")!\u0010\u0001C\u0001w\u0006\u00013M]3bi\u0016\u001c\u0016.\u001c9mKR{\u0007/[2t\u0003:$\u0017i^1ji2+\u0017\rZ3s)\u0019\u0001G0!\u0004\u0002(!)Q0\u001fa\u0001}\u0006A!p[\"mS\u0016tG\u000fE\u0002��\u0003\u0013i!!!\u0001\u000b\t\u0005\r\u0011QA\u0001\tu.\u001cG.[3oi*\u0019\u0011q\u0001\b\u0002\r%\u0003\u0014\n^3d\u0013\u0011\tY!!\u0001\u0003\u0011i[7\t\\5f]RDq!a\u0004z\u0001\u0004\t\t\"\u0001\u0004u_BL7m\u001d\t\u0007\u0003'\t)\"!\u0007\u000e\u0003)K1!a\u0006K\u0005\r\u0019V-\u001d\t\u0005\u00037\t\tCD\u0002%\u0003;I1!a\b&\u0003\u0019\u0001&/\u001a3fM&!\u00111EA\u0013\u0005\u0019\u0019FO]5oO*\u0019\u0011qD\u0013\t\r\u0005%\u0012\u00101\u0001$\u0003!\u0011'o\\6fe&#\u0007")
/* loaded from: input_file:kafka/integration/PrimitiveApiTest.class */
public class PrimitiveApiTest extends JUnit3Suite implements ProducerConsumerTestHarness {
    private final int port;
    private final Properties props;
    private final KafkaConfig config;
    private final List<KafkaConfig> configs;
    private final Logger requestHandlerLogger;
    private final String host;
    private Producer<String, String> producer;
    private SimpleConsumer consumer;
    private List<KafkaServer> servers;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.integration.ProducerConsumerTestHarness
    public String host() {
        return this.host;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public Producer<String, String> producer() {
        return this.producer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public void producer_$eq(Producer<String, String> producer) {
        this.producer = producer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public SimpleConsumer consumer() {
        return this.consumer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public void consumer_$eq(SimpleConsumer simpleConsumer) {
        this.consumer = simpleConsumer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public void kafka$integration$ProducerConsumerTestHarness$$super$setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public void kafka$integration$ProducerConsumerTestHarness$$super$tearDown() {
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public void kafka$integration$ProducerConsumerTestHarness$_setter_$host_$eq(String str) {
        this.host = str;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public int port() {
        return this.port;
    }

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        ProducerConsumerTestHarness.Cclass.setUp(this);
        requestHandlerLogger().setLevel(Level.FATAL);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        requestHandlerLogger().setLevel(Level.ERROR);
        ProducerConsumerTestHarness.Cclass.tearDown(this);
    }

    public void testFetchRequestCanProperlySerialize() {
        FetchRequest build = new FetchRequestBuilder().clientId("test-client").maxWait(10001).minBytes(4444).addFetch("topic1", 0, 0L, 10000).addFetch("topic2", 1, 1024L, 9999).addFetch("topic1", 1, 256L, 444).build();
        ByteBuffer allocate = ByteBuffer.allocate(build.sizeInBytes());
        build.writeTo(allocate);
        allocate.rewind();
        Assert.assertEquals(build, FetchRequest$.MODULE$.readFrom(allocate));
    }

    public void testEmptyFetchRequest() {
        FetchResponse fetch = consumer().fetch(new FetchRequest(FetchRequest$.MODULE$.$lessinit$greater$default$1(), FetchRequest$.MODULE$.$lessinit$greater$default$2(), FetchRequest$.MODULE$.$lessinit$greater$default$3(), FetchRequest$.MODULE$.$lessinit$greater$default$4(), FetchRequest$.MODULE$.$lessinit$greater$default$5(), FetchRequest$.MODULE$.$lessinit$greater$default$6(), Map$.MODULE$.apply(Nil$.MODULE$)));
        Assert.assertTrue(!fetch.hasError() && fetch.data().size() == 0);
    }

    public void testDefaultEncoderProducerAndFetch() {
        new Producer(new ProducerConfig(producer().config().props().props())).send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("test-topic", "test-message")}));
        ReplicaManager replicaManager = ((KafkaServer) servers().head()).replicaManager();
        Replica replica = (Replica) replicaManager.getReplica("test-topic", 0, replicaManager.getReplica$default$3()).get();
        Assert.assertTrue("HighWatermark should equal logEndOffset with just 1 replica", replica.logEndOffset() > 0 && replica.logEndOffset() == replica.highWatermark());
        FetchResponse fetch = consumer().fetch(new FetchRequestBuilder().clientId("test-client").addFetch("test-topic", 0, 0L, 10000).build());
        Assert.assertEquals("Returned correlationId doesn't match that in request.", 0, fetch.correlationId());
        ByteBufferMessageSet messageSet = fetch.messageSet("test-topic", 0);
        Assert.assertTrue(messageSet.iterator().hasNext());
        Assert.assertEquals("test-message", Utils$.MODULE$.readString(((MessageAndOffset) messageSet.head()).message().payload(), "UTF-8"));
    }

    public void testDefaultEncoderProducerAndFetchWithCompression() {
        Properties props = producer().config().props().props();
        props.put("compression", "true");
        new Producer(new ProducerConfig(props)).send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("test-topic", "test-message")}));
        ByteBufferMessageSet messageSet = consumer().fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 10000).build()).messageSet("test-topic", 0);
        Assert.assertTrue(messageSet.iterator().hasNext());
        Assert.assertEquals("test-message", Utils$.MODULE$.readString(((MessageAndOffset) messageSet.head()).message().payload(), "UTF-8"));
    }

    public void testProduceAndMultiFetch() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$1(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$2(this, hashMap, fetchRequestBuilder));
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$3(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
        requestHandlerLogger().setLevel(Level.FATAL);
        FetchRequestBuilder fetchRequestBuilder2 = new FetchRequestBuilder();
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$5(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$6(this, fetchRequestBuilder2));
        try {
            consumer().fetch(fetchRequestBuilder2.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$7(this));
            throw fail("Expected exception when fetching message with invalid offset");
        } catch (OffsetOutOfRangeException e) {
            FetchRequestBuilder fetchRequestBuilder3 = new FetchRequestBuilder();
            apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$8(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$9(this, fetchRequestBuilder3));
            try {
                consumer().fetch(fetchRequestBuilder3.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$10(this));
                throw fail("Expected exception when fetching message with invalid partition");
            } catch (UnknownTopicOrPartitionException e2) {
                requestHandlerLogger().setLevel(Level.ERROR);
            }
        }
    }

    public void testProduceAndMultiFetchWithCompression() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$1(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$2(this, hashMap, fetchRequestBuilder));
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$3(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
        requestHandlerLogger().setLevel(Level.FATAL);
        FetchRequestBuilder fetchRequestBuilder2 = new FetchRequestBuilder();
        apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$5(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$6(this, fetchRequestBuilder2));
        try {
            consumer().fetch(fetchRequestBuilder2.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$7(this));
            throw fail("Expected exception when fetching message with invalid offset");
        } catch (OffsetOutOfRangeException e) {
            FetchRequestBuilder fetchRequestBuilder3 = new FetchRequestBuilder();
            apply.withFilter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$8(this)).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$9(this, fetchRequestBuilder3));
            try {
                consumer().fetch(fetchRequestBuilder3.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$10(this));
                throw fail("Expected exception when fetching message with invalid partition");
            } catch (UnknownTopicOrPartitionException e2) {
                requestHandlerLogger().setLevel(Level.ERROR);
            }
        }
    }

    public void testMultiProduce() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        apply.withFilter(new PrimitiveApiTest$$anonfun$testMultiProduce$1(this)).foreach(new PrimitiveApiTest$$anonfun$testMultiProduce$2(this, hashMap, fetchRequestBuilder));
        producer().send(nil$);
        apply.withFilter(new PrimitiveApiTest$$anonfun$testMultiProduce$3(this)).foreach(new PrimitiveApiTest$$anonfun$testMultiProduce$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void testMultiProduceWithCompression() {
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        apply.withFilter(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$1(this)).foreach(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$2(this, hashMap, fetchRequestBuilder));
        producer().send(nil$);
        apply.withFilter(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$3(this)).foreach(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void testConsumerEmptyTopic() {
        CreateTopicCommand$.MODULE$.createTopic(zkClient(), "new-topic", 1, 1, BoxesRunTime.boxToInteger(config().brokerId()).toString());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), "new-topic", 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Assert.assertFalse(consumer().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator().hasNext());
    }

    public void testPipelinedProduceRequests() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        Properties props = producer().config().props().props();
        props.put("request.required.acks", "0");
        Producer producer = new Producer(new ProducerConfig(props));
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        apply.withFilter(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$9(this)).foreach(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$10(this, producer, hashMap, fetchRequestBuilder));
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$1(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$2(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$3(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$4(this), 1000L);
        int brokerId = ((KafkaServer) servers().head()).config().brokerId();
        long replicaHighWatermarkCheckpointIntervalMs = config().replicaHighWatermarkCheckpointIntervalMs();
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$5(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$6(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$7(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$8(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        apply.withFilter(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$11(this)).foreach(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$12(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void createSimpleTopicsAndAwaitLeader(ZkClient zkClient, Seq<String> seq, int i) {
        seq.foreach(new PrimitiveApiTest$$anonfun$createSimpleTopicsAndAwaitLeader$1(this, zkClient, i));
    }

    public PrimitiveApiTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        servers_$eq(null);
        ProducerConsumerTestHarness.Cclass.$init$(this);
        this.port = TestUtils$.MODULE$.choosePort();
        this.props = TestUtils$.MODULE$.createBrokerConfig(0, port());
        this.config = new KafkaConfig(props());
        this.configs = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config()}));
        this.requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
    }
}
