package kafka.producer;

import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.api.FetchRequestBuilder;
import kafka.common.FailedToSendMessageException;
import kafka.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.utils.ZkUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.exceptions.TestFailedException;
import org.scalatest.junit.JUnit3Suite;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ProducerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=f\u0001B\u0001\u0003\u0001\u001d\u0011A\u0002\u0015:pIV\u001cWM\u001d+fgRT!a\u0001\u0003\u0002\u0011A\u0014x\u000eZ;dKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0015\u0001\u0001B\u0005\r\u001f!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!a\u0003&V]&$8gU;ji\u0016\u0004\"a\u0005\f\u000e\u0003QQ!!\u0006\u0003\u0002\u0005i\\\u0017BA\f\u0015\u0005QQvn\\&fKB,'\u000fV3ti\"\u000b'O\\3tgB\u0011\u0011\u0004H\u0007\u00025)\u00111\u0004B\u0001\u0006kRLGn]\u0005\u0003;i\u0011q\u0001T8hO&tw\r\u0005\u0002 E5\t\u0001EC\u0001\"\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0003EA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\"B\u0013\u0001\t\u00031\u0013A\u0002\u001fj]&$h\bF\u0001(!\tA\u0003!D\u0001\u0003\u0011\u001dQ\u0003A1A\u0005\n-\n\u0011B\u0019:pW\u0016\u0014\u0018\nZ\u0019\u0016\u00031\u0002\"aH\u0017\n\u00059\u0002#aA%oi\"1\u0001\u0007\u0001Q\u0001\n1\n!B\u0019:pW\u0016\u0014\u0018\nZ\u0019!\u0011\u001d\u0011\u0004A1A\u0005\n-\n\u0011B\u0019:pW\u0016\u0014\u0018\n\u001a\u001a\t\rQ\u0002\u0001\u0015!\u0003-\u0003)\u0011'o\\6fe&#'\u0007\t\u0005\bm\u0001\u0011\r\u0011\"\u00038\u0003\u0015\u0001xN\u001d;t+\u0005A\u0004cA\u001dBY9\u0011!h\u0010\b\u0003wyj\u0011\u0001\u0010\u0006\u0003{\u0019\ta\u0001\u0010:p_Rt\u0014\"A\u0011\n\u0005\u0001\u0003\u0013a\u00029bG.\fw-Z\u0005\u0003\u0005\u000e\u0013A\u0001T5ti*\u0011\u0001\t\t\u0005\u0007\u000b\u0002\u0001\u000b\u0011\u0002\u001d\u0002\rA|'\u000f^:!\u0011\u001d9\u0005!!Q\u0001\n!\u000b1\u0001\u001f\u00132!\u0011y\u0012\n\f\u0017\n\u0005)\u0003#A\u0002+va2,'\u0007C\u0004M\u0001\t\u0007I\u0011B\u0016\u0002\u000bA|'\u000f^\u0019\t\r9\u0003\u0001\u0015!\u0003-\u0003\u0019\u0001xN\u001d;2A!9\u0001\u000b\u0001b\u0001\n\u0013Y\u0013!\u00029peR\u0014\u0004B\u0002*\u0001A\u0003%A&\u0001\u0004q_J$(\u0007\t\u0005\b)\u0002\u0001\r\u0011\"\u0003V\u0003\u001d\u0019XM\u001d<feF*\u0012A\u0016\t\u0003/jk\u0011\u0001\u0017\u0006\u00033\u0012\taa]3sm\u0016\u0014\u0018BA.Y\u0005-Y\u0015MZ6b'\u0016\u0014h/\u001a:\t\u000fu\u0003\u0001\u0019!C\u0005=\u0006Y1/\u001a:wKJ\ft\fJ3r)\ty&\r\u0005\u0002 A&\u0011\u0011\r\t\u0002\u0005+:LG\u000fC\u0004H9\u0006\u0005\t\u0019\u0001,\t\r\u0011\u0004\u0001\u0015)\u0003W\u0003!\u0019XM\u001d<feF\u0002\u0003b\u00024\u0001\u0001\u0004%I!V\u0001\bg\u0016\u0014h/\u001a:3\u0011\u001dA\u0007\u00011A\u0005\n%\f1b]3sm\u0016\u0014(g\u0018\u0013fcR\u0011qL\u001b\u0005\b\u000f\u001e\f\t\u00111\u0001W\u0011\u0019a\u0007\u0001)Q\u0005-\u0006A1/\u001a:wKJ\u0014\u0004\u0005C\u0004o\u0001\u0001\u0007I\u0011B8\u0002\u0013\r|gn];nKJ\fT#\u00019\u0011\u0005E$X\"\u0001:\u000b\u0005M$\u0011\u0001C2p]N,X.\u001a:\n\u0005U\u0014(AD*j[BdWmQ8ogVlWM\u001d\u0005\bo\u0002\u0001\r\u0011\"\u0003y\u00035\u0019wN\\:v[\u0016\u0014\u0018g\u0018\u0013fcR\u0011q,\u001f\u0005\b\u000fZ\f\t\u00111\u0001q\u0011\u0019Y\b\u0001)Q\u0005a\u0006Q1m\u001c8tk6,'/\r\u0011\t\u000fu\u0004\u0001\u0019!C\u0005_\u0006I1m\u001c8tk6,'O\r\u0005\t\u007f\u0002\u0001\r\u0011\"\u0003\u0002\u0002\u0005i1m\u001c8tk6,'OM0%KF$2aXA\u0002\u0011\u001d9e0!AA\u0002ADq!a\u0002\u0001A\u0003&\u0001/\u0001\u0006d_:\u001cX/\\3se\u0001B\u0011\"a\u0003\u0001\u0005\u0004%I!!\u0004\u0002)I,\u0017/^3ti\"\u000bg\u000e\u001a7fe2{wmZ3s+\t\ty\u0001\u0005\u0003\u0002\u0012\u0005mQBAA\n\u0015\u0011\t)\"a\u0006\u0002\u000b1|w\r\u000e6\u000b\u0007\u0005ea\"\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0005\u0003;\t\u0019B\u0001\u0004M_\u001e<WM\u001d\u0005\t\u0003C\u0001\u0001\u0015!\u0003\u0002\u0010\u0005)\"/Z9vKN$\b*\u00198eY\u0016\u0014Hj\\4hKJ\u0004\u0003\"CA\u0013\u0001\u0001\u0007I\u0011BA\u0014\u0003\u001d\u0019XM\u001d<feN,\"!!\u000b\u0011\u000b\u0005-\u0012Q\u0007,\u000e\u0005\u00055\"\u0002BA\u0018\u0003c\t\u0011\"[7nkR\f'\r\\3\u000b\u0007\u0005M\u0002%\u0001\u0006d_2dWm\u0019;j_:L1AQA\u0017\u0011%\tI\u0004\u0001a\u0001\n\u0013\tY$A\u0006tKJ4XM]:`I\u0015\fHcA0\u0002>!Iq)a\u000e\u0002\u0002\u0003\u0007\u0011\u0011\u0006\u0005\t\u0003\u0003\u0002\u0001\u0015)\u0003\u0002*\u0005A1/\u001a:wKJ\u001c\b\u0005C\u0005\u0002F\u0001\u0011\r\u0011\"\u0003\u0002H\u00051\u0001O]8qgF*\"!!\u0013\u0011\t\u0005-\u0013QK\u0007\u0003\u0003\u001bRA!a\u0014\u0002R\u0005!Q\u000f^5m\u0015\t\t\u0019&\u0001\u0003kCZ\f\u0017\u0002BA,\u0003\u001b\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011!\tY\u0006\u0001Q\u0001\n\u0005%\u0013a\u00029s_B\u001c\u0018\u0007\t\u0005\n\u0003?\u0002!\u0019!C\u0005\u0003C\nqaY8oM&<\u0017'\u0006\u0002\u0002dA\u0019q+!\u001a\n\u0007\u0005\u001d\u0004LA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002CA6\u0001\u0001\u0006I!a\u0019\u0002\u0011\r|gNZ5hc\u0001B\u0011\"a\u001c\u0001\u0005\u0004%I!a\u0012\u0002\rA\u0014x\u000e]:3\u0011!\t\u0019\b\u0001Q\u0001\n\u0005%\u0013a\u00029s_B\u001c(\u0007\t\u0005\n\u0003o\u0002!\u0019!C\u0005\u0003C\nqaY8oM&<'\u0007\u0003\u0005\u0002|\u0001\u0001\u000b\u0011BA2\u0003!\u0019wN\u001c4jOJ\u0002\u0003bBA@\u0001\u0011\u0005\u0013\u0011Q\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002?\"9\u0011Q\u0011\u0001\u0005B\u0005\u0005\u0015\u0001\u0003;fCJ$un\u001e8\t\u000f\u0005%\u0005\u0001\"\u0001\u0002\u0002\u0006iB/Z:u+B$\u0017\r^3Ce>\\WM\u001d)beRLG/[8o\u0013:4w\u000e\u000b\u0003\u0002\b\u00065\u0005\u0003BAH\u0003'k!!!%\u000b\u0005-q\u0011\u0002BAK\u0003#\u0013A\u0001V3ti\"9\u0011\u0011\u0014\u0001\u0005\u0002\u0005\u0005\u0015A\u0005;fgR\u001cVM\u001c3U_:+w\u000fV8qS\u000eDC!a&\u0002\u000e\"9\u0011q\u0014\u0001\u0005\u0002\u0005\u0005\u0015A\u0006;fgR\u001cVM\u001c3XSRDG)Z1e\u0005J|7.\u001a:)\t\u0005u\u0015Q\u0012\u0005\b\u0003K\u0003A\u0011AAA\u0003!\"Xm\u001d;Bgft7mU3oI\u000e\u000bgnQ8se\u0016\u001cG\u000f\\=GC&dw+\u001b;i)&lWm\\;uQ\u0011\t\u0019+!$\t\u000f\u0005-\u0006\u0001\"\u0001\u0002\u0002\u0006\u0019B/Z:u'\u0016tGMT;mY6+7o]1hK\"\"\u0011\u0011VAG\u0001")
/* loaded from: input_file:kafka/producer/ProducerTest.class */
public class ProducerTest extends JUnit3Suite implements ZooKeeperTestHarness, Logging {
    private final int brokerId1;
    private final int brokerId2;
    private final List<Object> ports;
    private final Tuple2 x$1;
    private final int port1;
    private final int port2;
    private KafkaServer server1;
    private KafkaServer server2;
    private SimpleConsumer consumer1;
    private SimpleConsumer consumer2;
    private final Logger requestHandlerLogger;
    private List<KafkaServer> servers;
    private final Properties props1;
    private final KafkaConfig config1;
    private final Properties props2;
    private final KafkaConfig config2;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    public volatile int bitmap$0;

    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11 */
    /* JADX WARN: Type inference failed for: r0v5 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable] */
    public Logger logger() {
        if ((this.bitmap$0 & 1) == 0) {
            ?? r0 = this;
            synchronized (r0) {
                if ((this.bitmap$0 & 1) == 0) {
                    this.logger = Logging.class.logger(this);
                    this.bitmap$0 |= 1;
                }
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                r0 = r0;
            }
        }
        return this.logger;
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String str) {
        this.logIdent = str;
    }

    public final Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ log4jController$) {
        this.kafka$utils$Logging$$log4jController = log4jController$;
    }

    public void trace(Function0<String> function0) {
        Logging.class.trace(this, function0);
    }

    /* renamed from: trace, reason: collision with other method in class */
    public Object m661trace(Function0<Throwable> function0) {
        return Logging.class.trace(this, function0);
    }

    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.trace(this, function0, function02);
    }

    public void swallowTrace(Function0<BoxedUnit> function0) {
        Logging.class.swallowTrace(this, function0);
    }

    public void debug(Function0<String> function0) {
        Logging.class.debug(this, function0);
    }

    /* renamed from: debug, reason: collision with other method in class */
    public Object m662debug(Function0<Throwable> function0) {
        return Logging.class.debug(this, function0);
    }

    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.debug(this, function0, function02);
    }

    public void swallowDebug(Function0<BoxedUnit> function0) {
        Logging.class.swallowDebug(this, function0);
    }

    public void info(Function0<String> function0) {
        Logging.class.info(this, function0);
    }

    /* renamed from: info, reason: collision with other method in class */
    public Object m663info(Function0<Throwable> function0) {
        return Logging.class.info(this, function0);
    }

    public void info(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.info(this, function0, function02);
    }

    public void swallowInfo(Function0<BoxedUnit> function0) {
        Logging.class.swallowInfo(this, function0);
    }

    public void warn(Function0<String> function0) {
        Logging.class.warn(this, function0);
    }

    /* renamed from: warn, reason: collision with other method in class */
    public Object m664warn(Function0<Throwable> function0) {
        return Logging.class.warn(this, function0);
    }

    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.warn(this, function0, function02);
    }

    public void swallowWarn(Function0<BoxedUnit> function0) {
        Logging.class.swallowWarn(this, function0);
    }

    public void swallow(Function0<BoxedUnit> function0) {
        Logging.class.swallow(this, function0);
    }

    public void error(Function0<String> function0) {
        Logging.class.error(this, function0);
    }

    /* renamed from: error, reason: collision with other method in class */
    public Object m665error(Function0<Throwable> function0) {
        return Logging.class.error(this, function0);
    }

    public void error(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.error(this, function0, function02);
    }

    public void swallowError(Function0<BoxedUnit> function0) {
        Logging.class.swallowError(this, function0);
    }

    public void fatal(Function0<String> function0) {
        Logging.class.fatal(this, function0);
    }

    /* renamed from: fatal, reason: collision with other method in class */
    public Object m666fatal(Function0<Throwable> function0) {
        return Logging.class.fatal(this, function0);
    }

    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        Logging.class.fatal(this, function0, function02);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    private int brokerId1() {
        return this.brokerId1;
    }

    private int brokerId2() {
        return this.brokerId2;
    }

    private List<Object> ports() {
        return this.ports;
    }

    private int port1() {
        return this.port1;
    }

    private int port2() {
        return this.port2;
    }

    private KafkaServer server1() {
        return this.server1;
    }

    private void server1_$eq(KafkaServer kafkaServer) {
        this.server1 = kafkaServer;
    }

    private KafkaServer server2() {
        return this.server2;
    }

    private void server2_$eq(KafkaServer kafkaServer) {
        this.server2 = kafkaServer;
    }

    private SimpleConsumer consumer1() {
        return this.consumer1;
    }

    private void consumer1_$eq(SimpleConsumer simpleConsumer) {
        this.consumer1 = simpleConsumer;
    }

    private SimpleConsumer consumer2() {
        return this.consumer2;
    }

    private void consumer2_$eq(SimpleConsumer simpleConsumer) {
        this.consumer2 = simpleConsumer;
    }

    private Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    private List<KafkaServer> servers() {
        return this.servers;
    }

    private void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    private Properties props1() {
        return this.props1;
    }

    private KafkaConfig config1() {
        return this.config1;
    }

    private Properties props2() {
        return this.props2;
    }

    private KafkaConfig config2() {
        return this.config2;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
        server1_$eq(TestUtils$.MODULE$.createServer(config1(), TestUtils$.MODULE$.createServer$default$2()));
        server2_$eq(TestUtils$.MODULE$.createServer(config2(), TestUtils$.MODULE$.createServer$default$2()));
        servers_$eq(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{server1(), server2()})));
        Properties properties = new Properties();
        properties.put("host", "localhost");
        properties.put("port", BoxesRunTime.boxToInteger(port1()).toString());
        consumer1_$eq(new SimpleConsumer("localhost", port1(), 1000000, 65536, ""));
        consumer2_$eq(new SimpleConsumer("localhost", port2(), 100, 65536, ""));
        requestHandlerLogger().setLevel(Level.FATAL);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        requestHandlerLogger().setLevel(Level.ERROR);
        server1().shutdown();
        server2().shutdown();
        Utils$.MODULE$.rm(server1().config().logDirs());
        Utils$.MODULE$.rm(server2().config().logDirs());
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
        java.lang.NullPointerException: Cannot invoke "String.charAt(int)" because "obj" is null
        	at jadx.core.utils.Utils.cleanObjectName(Utils.java:38)
        	at jadx.core.dex.instructions.args.ArgType.object(ArgType.java:86)
        	at jadx.core.dex.info.ClassInfo.fromName(ClassInfo.java:42)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.convertToHandlers(AttachTryCatchVisitor.java:113)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.initTryCatches(AttachTryCatchVisitor.java:54)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.visit(AttachTryCatchVisitor.java:42)
        */
    @org.junit.Test
    public void testUpdateBrokerPartitionInfo() {
        /*
            Method dump skipped, instructions count: 518
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.producer.ProducerTest.testUpdateBrokerPartitionInfo():void");
    }

    @Test
    public void testSendToNewTopic() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("partitioner.class", "kafka.utils.StaticPartitioner");
        properties.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config1(), config2()}))));
        properties.put("request.required.acks", "2");
        properties.put("request.timeout.ms", "1000");
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("request.required.acks", "3");
        properties2.put("request.timeout.ms", "1000");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        ProducerConfig producerConfig2 = new ProducerConfig(properties2);
        AdminUtils$.MODULE$.createTopic(zkClient(), "new-topic", 1, 2, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), "new-topic", 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Producer producer = new Producer(producerConfig);
        Producer producer2 = new Producer(producerConfig2);
        producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test1")}));
        producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test2")}));
        Option leaderForPartition = ZkUtils$.MODULE$.getLeaderForPartition(zkClient(), "new-topic", 0);
        Assert.assertTrue("Leader for topic new-topic partition 0 should exist", leaderForPartition.isDefined());
        Buffer buffer = BoxesRunTime.unboxToInt(leaderForPartition.get()) == server1().config().brokerId() ? consumer1().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator().toBuffer() : consumer2().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator().toBuffer();
        Assert.assertEquals("Should have fetched 2 messages", BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToInteger(buffer.size()));
        Assert.assertEquals(new Message("test1".getBytes(), "test".getBytes()), ((MessageAndOffset) buffer.apply(0)).message());
        Assert.assertEquals(new Message("test2".getBytes(), "test".getBytes()), ((MessageAndOffset) buffer.apply(1)).message());
        producer.close();
        try {
            try {
                producer2.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test2")}));
                throw fail("Should have timed out for 3 acks.");
            } catch (FailedToSendMessageException e) {
                producer2.close();
            } catch (Throwable th) {
                throw fail("Not expected", th);
            }
        } catch (Throwable th2) {
            producer2.close();
            throw th2;
        }
    }

    @Test
    public void testSendWithDeadBroker() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("partitioner.class", "kafka.utils.StaticPartitioner");
        properties.put("request.timeout.ms", "2000");
        properties.put("request.required.acks", "1");
        properties.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config1(), config2()}))));
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient(), "new-topic", Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(0)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(1)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(2)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(3)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), "new-topic", 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 1, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 2, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 3, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Producer producer = new Producer(new ProducerConfig(properties));
        try {
            producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test1")}));
            server1().shutdown();
            server1().awaitShutdown();
            try {
                producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test1")}));
                throw fail("Should fail since no leader exists for the partition.");
            } catch (TestFailedException e) {
                throw e;
            } catch (Throwable th) {
                server1().startup();
                TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
                try {
                    Iterator it = consumer1().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator();
                    Assert.assertTrue("Message set should have 1 message", it.hasNext());
                    Assert.assertEquals(new Message("test1".getBytes(), "test".getBytes()), ((MessageAndOffset) it.next()).message());
                    Assert.assertFalse("Message set should have another message", it.hasNext());
                    producer.close();
                } catch (Exception e2) {
                    throw fail("Not expected", e2);
                }
            }
        } catch (Throwable th2) {
            throw fail(new StringBuilder().append("Unexpected exception: ").append(th2).toString());
        }
    }

    @Test
    public void testAsyncSendCanCorrectlyFailWithTimeout() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("partitioner.class", "kafka.utils.StaticPartitioner");
        properties.put("request.timeout.ms", String.valueOf(500));
        properties.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config1(), config2()}))));
        properties.put("request.required.acks", "1");
        properties.put("client.id", "ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Producer producer = new Producer(producerConfig);
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient(), "new-topic", Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(0)).$minus$greater(Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), "new-topic", 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        try {
            producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test")}));
            Iterator it = consumer1().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator();
            Assert.assertTrue("Message set should have 1 message", it.hasNext());
            Assert.assertEquals(new Message("test".getBytes()), ((MessageAndOffset) it.next()).message());
        } catch (Exception e) {
            producer.close();
            throw fail("Not expected", e);
        } catch (Throwable th) {
        }
        server1().requestHandlerPool().shutdown();
        long milliseconds = SystemTime$.MODULE$.milliseconds();
        try {
            try {
                producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "test", "test")}));
                producer.close();
            } catch (Exception e2) {
                throw fail("Not expected", e2);
            } catch (FailedToSendMessageException e3) {
                producer.close();
            }
            Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds >= ((long) (500 * producerConfig.messageSendMaxRetries())));
        } catch (Throwable th2) {
            producer.close();
            throw th2;
        }
    }

    @Test
    public void testSendNullMessage() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("partitioner.class", "kafka.utils.StaticPartitioner");
        properties.put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config1(), config2()}))));
        Producer producer = new Producer(new ProducerConfig(properties));
        try {
            AdminUtils$.MODULE$.createTopic(zkClient(), "new-topic", 2, 1, AdminUtils$.MODULE$.createTopic$default$5());
            Assert.assertTrue("Topic new-topic not created after timeout", TestUtils$.MODULE$.waitUntilTrue(new ProducerTest$$anonfun$testSendNullMessage$1(this), zookeeper().tickTime()));
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
            producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("new-topic", "key", (Object) null)}));
        } finally {
            producer.close();
        }
    }

    public ProducerTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        Logging.class.$init$(this);
        this.brokerId1 = 0;
        this.brokerId2 = 1;
        this.ports = TestUtils$.MODULE$.choosePorts(2);
        Tuple2.mcII.sp spVar = new Tuple2.mcII.sp(BoxesRunTime.unboxToInt(ports().apply(0)), BoxesRunTime.unboxToInt(ports().apply(1)));
        if (spVar == null) {
            throw new MatchError(spVar);
        }
        this.x$1 = new Tuple2.mcII.sp(BoxesRunTime.unboxToInt(spVar._1()), BoxesRunTime.unboxToInt(spVar._2()));
        this.port1 = this.x$1._1$mcI$sp();
        this.port2 = this.x$1._2$mcI$sp();
        this.server1 = null;
        this.server2 = null;
        this.consumer1 = null;
        this.consumer2 = null;
        this.requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
        this.servers = List$.MODULE$.empty();
        this.props1 = TestUtils$.MODULE$.createBrokerConfig(brokerId1(), port1());
        props1().put("num.partitions", "4");
        this.config1 = new KafkaConfig(props1());
        this.props2 = TestUtils$.MODULE$.createBrokerConfig(brokerId2(), port2());
        props2().put("num.partitions", "4");
        this.config2 = new KafkaConfig(props2());
    }
}
