package kafka.api;

import java.io.File;
import java.util.Collection;
import java.util.Properties;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.ShutdownableThread;
import kafka.utils.TestUtils$;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.Assert;
import org.junit.Test;
import scala.Function3;
import scala.None$;
import scala.Option;
import scala.collection.JavaConverters$;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Range$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ListBuffer;
import scala.math.Ordering$Int$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: TransactionsBounceTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005]e\u0001B\u000e\u001d\u0001\u0005BQ\u0001\u000b\u0001\u0005\u0002%Bq\u0001\f\u0001C\u0002\u0013%Q\u0006\u0003\u00045\u0001\u0001\u0006IA\f\u0005\bk\u0001\u0011\r\u0011\"\u0003.\u0011\u00191\u0004\u0001)A\u0005]!9q\u0007\u0001b\u0001\n\u0013i\u0003B\u0002\u001d\u0001A\u0003%a\u0006C\u0004:\u0001\t\u0007I\u0011A\u0017\t\ri\u0002\u0001\u0015!\u0003/\u0011\u001dY\u0004A1A\u0005\nqBa!\u0012\u0001!\u0002\u0013i\u0004b\u0002$\u0001\u0005\u0004%I\u0001\u0010\u0005\u0007\u000f\u0002\u0001\u000b\u0011B\u001f\t\u000f!\u0003!\u0019!C\u0001\u0013\"1\u0001\u000b\u0001Q\u0001\n)CQ!\u0015\u0001\u0005BICQ!\u0019\u0001\u0005\u0002\tDQa\u001c\u0001\u0005\u0002\tDQ!\u001d\u0001\u0005\nIDq!a\r\u0001\t\u0013\t)\u0004C\u0005\u0002\\\u0001\t\n\u0011\"\u0003\u0002^!9\u00111\u000f\u0001\u0005\n\u0005UdABA?\u0001\u0011\ty\b\u0003\u0004)/\u0011\u0005\u0011Q\u0012\u0005\u0007\u0003';B\u0011\t2\t\r\u0005Uu\u0003\"\u0011c\u0005Y!&/\u00198tC\u000e$\u0018n\u001c8t\u0005>,hnY3UKN$(BA\u000f\u001f\u0003\r\t\u0007/\u001b\u0006\u0002?\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001#!\t\u0019c%D\u0001%\u0015\t)c$A\u0006j]R,wM]1uS>t\u0017BA\u0014%\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001+!\tY\u0003!D\u0001\u001d\u0003I\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3\u0016\u00039\u0002\"a\f\u001a\u000e\u0003AR\u0011!M\u0001\u0006g\u000e\fG.Y\u0005\u0003gA\u00121!\u00138u\u0003M\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3!\u0003U\u0019XM\u001d<fe6+7o]1hK6\u000b\u0007PQ=uKN\fac]3sm\u0016\u0014X*Z:tC\u001e,W*\u0019=CsR,7\u000fI\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\u0002\u001d9,X\u000eU1si&$\u0018n\u001c8tA\u0005Qa.^7TKJ4XM]:\u0002\u00179,XnU3sm\u0016\u00148\u000fI\u0001\f_V$\b/\u001e;U_BL7-F\u0001>!\tq4)D\u0001@\u0015\t\u0001\u0015)\u0001\u0003mC:<'\"\u0001\"\u0002\t)\fg/Y\u0005\u0003\t~\u0012aa\u0015;sS:<\u0017\u0001D8viB,H\u000fV8qS\u000e\u0004\u0013AC5oaV$Hk\u001c9jG\u0006Y\u0011N\u001c9viR{\u0007/[2!\u0003=yg/\u001a:sS\u0012Lgn\u001a)s_B\u001cX#\u0001&\u0011\u0005-sU\"\u0001'\u000b\u00055\u000b\u0015\u0001B;uS2L!a\u0014'\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\tpm\u0016\u0014(/\u001b3j]\u001e\u0004&o\u001c9tA\u0005yq-\u001a8fe\u0006$XmQ8oM&<7/F\u0001T!\r!\u0016lW\u0007\u0002+*\u0011akV\u0001\nS6lW\u000f^1cY\u0016T!\u0001\u0017\u0019\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002[+\n\u00191+Z9\u0011\u0005q{V\"A/\u000b\u0005ys\u0012AB:feZ,'/\u0003\u0002a;\nY1*\u00194lC\u000e{gNZ5h\u0003=!Xm\u001d;XSRDwI]8va&#G#A2\u0011\u0005=\"\u0017BA31\u0005\u0011)f.\u001b;)\u0005E9\u0007C\u00015n\u001b\u0005I'B\u00016l\u0003\u0015QWO\\5u\u0015\u0005a\u0017aA8sO&\u0011a.\u001b\u0002\u0005)\u0016\u001cH/A\u000buKN$x+\u001b;i\u000fJ|W\u000f]'fi\u0006$\u0017\r^1)\u0005I9\u0017!\u0005;fgR\u0014%o\\6fe\u001a\u000b\u0017\u000e\\;sKR\u00111m\u001d\u0005\u0006iN\u0001\r!^\u0001\u0007G>lW.\u001b;\u0011\u0011=2\b0a\u0005\u0002(\rL!a\u001e\u0019\u0003\u0013\u0019+hn\u0019;j_:\u001c\u0004cB=\u0002\u0004\u0005\u001d\u0011qA\u0007\u0002u*\u00111\u0010`\u0001\taJ|G-^2fe*\u0011QP`\u0001\bG2LWM\u001c;t\u0015\tyrPC\u0002\u0002\u0002-\fa!\u00199bG\",\u0017bAA\u0003u\ni1*\u00194lCB\u0013x\u000eZ;dKJ\u0004RaLA\u0005\u0003\u001bI1!a\u00031\u0005\u0015\t%O]1z!\ry\u0013qB\u0005\u0004\u0003#\u0001$\u0001\u0002\"zi\u0016\u0004B!!\u0006\u0002$9!\u0011qCA\u0010!\r\tI\u0002M\u0007\u0003\u00037Q1!!\b!\u0003\u0019a$o\\8u}%\u0019\u0011\u0011\u0005\u0019\u0002\rA\u0013X\rZ3g\u0013\r!\u0015Q\u0005\u0006\u0004\u0003C\u0001\u0004\u0003CA\u0015\u0003_\t9!a\u0002\u000e\u0005\u0005-\"bAA\u0017y\u0006A1m\u001c8tk6,'/\u0003\u0003\u00022\u0005-\"!D&bM.\f7i\u001c8tk6,'/\u0001\u0012de\u0016\fG/Z\"p]N,X.\u001a:B]\u0012\u001cVOY:de&\u0014W\rV8U_BL7m\u001d\u000b\t\u0003O\t9$a\u000f\u0002R!9\u0011\u0011\b\u000bA\u0002\u0005M\u0011aB4s_V\u0004\u0018\n\u001a\u0005\b\u0003{!\u0002\u0019AA \u0003\u0019!x\u000e]5dgB1\u0011\u0011IA&\u0003'qA!a\u0011\u0002H9!\u0011\u0011DA#\u0013\u0005\t\u0014bAA%a\u00059\u0001/Y2lC\u001e,\u0017\u0002BA'\u0003\u001f\u0012A\u0001T5ti*\u0019\u0011\u0011\n\u0019\t\u0013\u0005MC\u0003%AA\u0002\u0005U\u0013!\u0004:fC\u0012\u001cu.\\7jiR,G\rE\u00020\u0003/J1!!\u00171\u0005\u001d\u0011un\u001c7fC:\fAf\u0019:fCR,7i\u001c8tk6,'/\u00118e'V\u00147o\u0019:jE\u0016$v\u000eV8qS\u000e\u001cH\u0005Z3gCVdG\u000fJ\u001a\u0016\u0005\u0005}#\u0006BA+\u0003CZ#!a\u0019\u0011\t\u0005\u0015\u0014qN\u0007\u0003\u0003ORA!!\u001b\u0002l\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003[\u0002\u0014AC1o]>$\u0018\r^5p]&!\u0011\u0011OA4\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\rGJ,\u0017\r^3U_BL7m\u001d\u000b\u0003\u0003o\u0002R\u0001VA=]9J1!a\u001fV\u0005\ri\u0015\r\u001d\u0002\u0010\u0005>,hnY3TG\",G-\u001e7feN\u0019q#!!\u0011\t\u0005\r\u0015\u0011R\u0007\u0003\u0003\u000bS1!a\"\u001f\u0003\u0015)H/\u001b7t\u0013\u0011\tY)!\"\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u000b\u0003\u0003\u001f\u00032!!%\u0018\u001b\u0005\u0001\u0011A\u00023p/>\u00148.\u0001\u0005tQV$Hm\\<o\u0001")
/* loaded from: input_file:kafka/api/TransactionsBounceTest.class */
public class TransactionsBounceTest extends KafkaServerTestHarness {
    private final int producerBufferSize = 65536;
    private final int serverMessageMaxBytes = producerBufferSize() / 2;
    private final int kafka$api$TransactionsBounceTest$$numPartitions = 3;
    private final int numServers = 4;
    private final String kafka$api$TransactionsBounceTest$$outputTopic = "output-topic";
    private final String inputTopic = "input-topic";
    private final Properties overridingProps = new Properties();

    /* compiled from: TransactionsBounceTest.scala */
    /* loaded from: input_file:kafka/api/TransactionsBounceTest$BounceScheduler.class */
    public class BounceScheduler extends ShutdownableThread {
        public final /* synthetic */ TransactionsBounceTest $outer;

        public void doWork() {
            kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().servers().foreach(kafkaServer -> {
                $anonfun$doWork$1(this, kafkaServer);
                return BoxedUnit.UNIT;
            });
            RichInt$ richInt$ = RichInt$.MODULE$;
            int kafka$api$TransactionsBounceTest$$numPartitions = kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$numPartitions();
            Range$ range$ = Range$.MODULE$;
            Range.Exclusive exclusive = new Range.Exclusive(0, kafka$api$TransactionsBounceTest$$numPartitions, 1);
            if (exclusive.isEmpty()) {
                return;
            }
            int start = exclusive.start();
            while (true) {
                int i = start;
                $anonfun$doWork$5(this, i);
                if (i == ((Range) exclusive).scala$collection$immutable$Range$$lastElement) {
                    return;
                } else {
                    start = i + exclusive.step();
                }
            }
        }

        public void shutdown() {
            super.shutdown();
        }

        public /* synthetic */ TransactionsBounceTest kafka$api$TransactionsBounceTest$BounceScheduler$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ void $anonfun$doWork$1(BounceScheduler bounceScheduler, KafkaServer kafkaServer) {
            bounceScheduler.trace(() -> {
                return StringOps$.MODULE$.format$extension("Shutting down server : %s", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{Integer.valueOf(kafkaServer.config().brokerId())}));
            });
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
            Thread.sleep(500L);
            bounceScheduler.trace(() -> {
                return StringOps$.MODULE$.format$extension("Server %s shut down. Starting it up again.", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{Integer.valueOf(kafkaServer.config().brokerId())}));
            });
            kafkaServer.startup();
            bounceScheduler.trace(() -> {
                return StringOps$.MODULE$.format$extension("Restarted server: %s", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{Integer.valueOf(kafkaServer.config().brokerId())}));
            });
            Thread.sleep(500L);
        }

        public static final /* synthetic */ int $anonfun$doWork$5(BounceScheduler bounceScheduler, int i) {
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            KafkaZkClient zkClient = bounceScheduler.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().zkClient();
            String kafka$api$TransactionsBounceTest$$outputTopic = bounceScheduler.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$outputTopic();
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            None$ none$ = None$.MODULE$;
            TestUtils$ testUtils$4 = TestUtils$.MODULE$;
            return testUtils$.waitUntilLeaderIsElectedOrChanged(zkClient, kafka$api$TransactionsBounceTest$$outputTopic, i, 30000L, none$, None$.MODULE$);
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public BounceScheduler(TransactionsBounceTest transactionsBounceTest) {
            super("daemon-broker-bouncer", false);
            if (transactionsBounceTest == null) {
                throw null;
            }
            this.$outer = transactionsBounceTest;
        }
    }

    private int producerBufferSize() {
        return this.producerBufferSize;
    }

    private int serverMessageMaxBytes() {
        return this.serverMessageMaxBytes;
    }

    public int kafka$api$TransactionsBounceTest$$numPartitions() {
        return this.kafka$api$TransactionsBounceTest$$numPartitions;
    }

    public int numServers() {
        return this.numServers;
    }

    public String kafka$api$TransactionsBounceTest$$outputTopic() {
        return this.kafka$api$TransactionsBounceTest$$outputTopic;
    }

    private String inputTopic() {
        return this.inputTopic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs, reason: merged with bridge method [inline-methods] */
    public Seq<KafkaConfig> mo39generateConfigs() {
        FixedPortTestUtils$ fixedPortTestUtils$ = FixedPortTestUtils$.MODULE$;
        int numServers = numServers();
        String zkConnect = zkConnect();
        FixedPortTestUtils$ fixedPortTestUtils$2 = FixedPortTestUtils$.MODULE$;
        return (Seq) fixedPortTestUtils$.createBrokerConfigs(numServers, zkConnect, true, false).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, this.overridingProps(), true);
        });
    }

    @Test
    public void testWithGroupId() {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics = createConsumerAndSubscribeToTopics("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaServer> servers = servers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        KafkaProducer<byte[], byte[]> createTransactionalProducer = testUtils$.createTransactionalProducer("test-txn", servers, 512, 60000L, 60000L, 120000);
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        IntRef intRef = new IntRef(0);
        IntRef intRef2 = new IntRef(0);
        while (intRef.elem < 10000) {
            try {
                int min = Math.min(200, 10000 - intRef.elem);
                trace(() -> {
                    return new StringBuilder(46).append(intRef2.elem).append(": About to read ").append(min).append(" messages, processed ").append(intRef.elem).append(" so far..").toString();
                });
                TestUtils$ testUtils$5 = TestUtils$.MODULE$;
                TestUtils$ testUtils$6 = TestUtils$.MODULE$;
                scala.collection.Seq pollUntilAtLeastNumRecords = testUtils$5.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics, min, 15000L);
                trace(() -> {
                    StringBuilder append = new StringBuilder(52).append("Received ");
                    if (pollUntilAtLeastNumRecords == null) {
                        throw null;
                    }
                    return append.append(pollUntilAtLeastNumRecords.length()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = intRef2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.length()).append(" messages. Committing offsets.").toString();
                });
                $anonfun$testWithGroupId$1(createTransactionalProducer, "myGroup", createConsumerAndSubscribeToTopics);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribeToTopics);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    intRef.elem += pollUntilAtLeastNumRecords.length();
                }
                intRef2.elem++;
            } catch (Throwable th) {
                createTransactionalProducer.close();
                createConsumerAndSubscribeToTopics.close();
                throw th;
            }
        }
        createTransactionalProducer.close();
        createConsumerAndSubscribeToTopics.close();
        bounceScheduler.shutdown();
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics2 = createConsumerAndSubscribeToTopics("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
        HashMap hashMap = new HashMap();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        testUtils$7.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics2, 10000, 15000L).foreach(consumerRecord2 -> {
            int parseInt = Integer.parseInt(TestUtils$.MODULE$.assertCommittedAndGetValue(consumerRecord2));
            Buffer buffer = (Buffer) hashMap.getOrElseUpdate(new TopicPartition(consumerRecord2.topic(), consumerRecord2.partition()), () -> {
                return new ListBuffer();
            });
            Integer valueOf = Integer.valueOf(parseInt);
            if (buffer == null) {
                throw null;
            }
            return (Buffer) buffer.addOne(valueOf);
        });
        ListBuffer listBuffer = new ListBuffer();
        hashMap.values().foreach(listBuffer2 -> {
            Assert.assertEquals("Out of order messages detected", listBuffer2, listBuffer2.sorted(Ordering$Int$.MODULE$));
            return listBuffer.addAll(listBuffer2);
        });
        Set set = listBuffer.toSet();
        Assert.assertEquals(10000, set.size());
        RichInt$ richInt$ = RichInt$.MODULE$;
        Range$ range$ = Range$.MODULE$;
        Set set2 = new Range.Exclusive(0, 10000, 1).toSet();
        StringBuilder append = new StringBuilder(18).append("Missing messages: ");
        if (set2 == null) {
            throw null;
        }
        Assert.assertEquals(append.append(set2.removedAll(set)).toString(), set2, set);
        createConsumerAndSubscribeToTopics2.close();
    }

    @Test
    public void testWithGroupMetadata() {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics = createConsumerAndSubscribeToTopics("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaServer> servers = servers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        KafkaProducer<byte[], byte[]> createTransactionalProducer = testUtils$.createTransactionalProducer("test-txn", servers, 512, 60000L, 60000L, 120000);
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        IntRef intRef = new IntRef(0);
        IntRef intRef2 = new IntRef(0);
        while (intRef.elem < 10000) {
            try {
                int min = Math.min(200, 10000 - intRef.elem);
                trace(() -> {
                    return new StringBuilder(46).append(intRef2.elem).append(": About to read ").append(min).append(" messages, processed ").append(intRef.elem).append(" so far..").toString();
                });
                TestUtils$ testUtils$5 = TestUtils$.MODULE$;
                TestUtils$ testUtils$6 = TestUtils$.MODULE$;
                scala.collection.Seq pollUntilAtLeastNumRecords = testUtils$5.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics, min, 15000L);
                trace(() -> {
                    StringBuilder append = new StringBuilder(52).append("Received ");
                    if (pollUntilAtLeastNumRecords == null) {
                        throw null;
                    }
                    return append.append(pollUntilAtLeastNumRecords.length()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = intRef2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.length()).append(" messages. Committing offsets.").toString();
                });
                $anonfun$testWithGroupMetadata$1(createTransactionalProducer, "myGroup", createConsumerAndSubscribeToTopics);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribeToTopics);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    intRef.elem += pollUntilAtLeastNumRecords.length();
                }
                intRef2.elem++;
            } catch (Throwable th) {
                createTransactionalProducer.close();
                createConsumerAndSubscribeToTopics.close();
                throw th;
            }
        }
        createTransactionalProducer.close();
        createConsumerAndSubscribeToTopics.close();
        bounceScheduler.shutdown();
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics2 = createConsumerAndSubscribeToTopics("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
        HashMap hashMap = new HashMap();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        testUtils$7.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics2, 10000, 15000L).foreach(consumerRecord2 -> {
            int parseInt = Integer.parseInt(TestUtils$.MODULE$.assertCommittedAndGetValue(consumerRecord2));
            Buffer buffer = (Buffer) hashMap.getOrElseUpdate(new TopicPartition(consumerRecord2.topic(), consumerRecord2.partition()), () -> {
                return new ListBuffer();
            });
            Integer valueOf = Integer.valueOf(parseInt);
            if (buffer == null) {
                throw null;
            }
            return (Buffer) buffer.addOne(valueOf);
        });
        ListBuffer listBuffer = new ListBuffer();
        hashMap.values().foreach(listBuffer2 -> {
            Assert.assertEquals("Out of order messages detected", listBuffer2, listBuffer2.sorted(Ordering$Int$.MODULE$));
            return listBuffer.addAll(listBuffer2);
        });
        Set set = listBuffer.toSet();
        Assert.assertEquals(10000, set.size());
        RichInt$ richInt$ = RichInt$.MODULE$;
        Range$ range$ = Range$.MODULE$;
        Set set2 = new Range.Exclusive(0, 10000, 1).toSet();
        StringBuilder append = new StringBuilder(18).append("Missing messages: ");
        if (set2 == null) {
            throw null;
        }
        Assert.assertEquals(append.append(set2.removedAll(set)).toString(), set2, set);
        createConsumerAndSubscribeToTopics2.close();
    }

    private void testBrokerFailure(Function3<KafkaProducer<byte[], byte[]>, String, KafkaConsumer<byte[], byte[]>, BoxedUnit> function3) {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics = createConsumerAndSubscribeToTopics("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaServer> servers = servers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        KafkaProducer<byte[], byte[]> createTransactionalProducer = testUtils$.createTransactionalProducer("test-txn", servers, 512, 60000L, 60000L, 120000);
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        IntRef intRef = new IntRef(0);
        IntRef intRef2 = new IntRef(0);
        while (intRef.elem < 10000) {
            try {
                int min = Math.min(200, 10000 - intRef.elem);
                trace(() -> {
                    return new StringBuilder(46).append(intRef2.elem).append(": About to read ").append(min).append(" messages, processed ").append(intRef.elem).append(" so far..").toString();
                });
                TestUtils$ testUtils$5 = TestUtils$.MODULE$;
                TestUtils$ testUtils$6 = TestUtils$.MODULE$;
                scala.collection.Seq pollUntilAtLeastNumRecords = testUtils$5.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics, min, 15000L);
                trace(() -> {
                    StringBuilder append = new StringBuilder(52).append("Received ");
                    if (pollUntilAtLeastNumRecords == null) {
                        throw null;
                    }
                    return append.append(pollUntilAtLeastNumRecords.length()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = intRef2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.length()).append(" messages. Committing offsets.").toString();
                });
                function3.apply(createTransactionalProducer, "myGroup", createConsumerAndSubscribeToTopics);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribeToTopics);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.length()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    intRef.elem += pollUntilAtLeastNumRecords.length();
                }
                intRef2.elem++;
            } catch (Throwable th) {
                createTransactionalProducer.close();
                createConsumerAndSubscribeToTopics.close();
                throw th;
            }
        }
        createTransactionalProducer.close();
        createConsumerAndSubscribeToTopics.close();
        bounceScheduler.shutdown();
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics2 = createConsumerAndSubscribeToTopics("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
        HashMap hashMap = new HashMap();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        testUtils$7.pollUntilAtLeastNumRecords(createConsumerAndSubscribeToTopics2, 10000, 15000L).foreach(consumerRecord2 -> {
            int parseInt = Integer.parseInt(TestUtils$.MODULE$.assertCommittedAndGetValue(consumerRecord2));
            Buffer buffer = (Buffer) hashMap.getOrElseUpdate(new TopicPartition(consumerRecord2.topic(), consumerRecord2.partition()), () -> {
                return new ListBuffer();
            });
            Integer valueOf = Integer.valueOf(parseInt);
            if (buffer == null) {
                throw null;
            }
            return (Buffer) buffer.addOne(valueOf);
        });
        ListBuffer listBuffer = new ListBuffer();
        hashMap.values().foreach(listBuffer2 -> {
            Assert.assertEquals("Out of order messages detected", listBuffer2, listBuffer2.sorted(Ordering$Int$.MODULE$));
            return listBuffer.addAll(listBuffer2);
        });
        Set set = listBuffer.toSet();
        Assert.assertEquals(10000, set.size());
        RichInt$ richInt$ = RichInt$.MODULE$;
        Range$ range$ = Range$.MODULE$;
        Set set2 = new Range.Exclusive(0, 10000, 1).toSet();
        StringBuilder append = new StringBuilder(18).append("Missing messages: ");
        if (set2 == null) {
            throw null;
        }
        Assert.assertEquals(append.append(set2.removedAll(set)).toString(), set2, set);
        createConsumerAndSubscribeToTopics2.close();
    }

    private KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics(String str, List<String> list, boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaServer> servers = servers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        String brokerListStrFromServers = testUtils$.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT);
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        Option<File> option = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        Option<Properties> option2 = None$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        KafkaConsumer<byte[], byte[]> createConsumer = TestUtils$.MODULE$.createConsumer(brokerListStrFromServers, str, "earliest", false, z, 500, securityProtocol, option, option2, byteArrayDeserializer, new ByteArrayDeserializer());
        createConsumer.subscribe((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
        return createConsumer;
    }

    private boolean createConsumerAndSubscribeToTopics$default$3() {
        return false;
    }

    private Map<Object, Object> createTopics() {
        Properties properties = new Properties();
        Integer num = 2;
        properties.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), num.toString());
        createTopic(inputTopic(), kafka$api$TransactionsBounceTest$$numPartitions(), 3, properties);
        return createTopic(kafka$api$TransactionsBounceTest$$outputTopic(), kafka$api$TransactionsBounceTest$$numPartitions(), 3, properties);
    }

    public static final /* synthetic */ void $anonfun$testWithGroupId$1(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        kafkaProducer.sendOffsetsToTransaction((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions(kafkaConsumer)).asJava(), str);
    }

    public static final /* synthetic */ void $anonfun$testWithGroupMetadata$1(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        kafkaProducer.sendOffsetsToTransaction((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions(kafkaConsumer)).asJava(), kafkaConsumer.groupMetadata());
    }

    public TransactionsBounceTest() {
        Boolean bool = false;
        overridingProps().put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), bool.toString());
        overridingProps().put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Integer.valueOf(serverMessageMaxBytes()).toString());
        Boolean bool2 = true;
        overridingProps().put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), bool2.toString());
        Boolean bool3 = false;
        overridingProps().put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), bool3.toString());
        Boolean bool4 = false;
        overridingProps().put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), bool4.toString());
        Integer num = 1;
        overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), num.toString());
        Integer num2 = 3;
        overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), num2.toString());
        Integer num3 = 2;
        overridingProps().put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), num3.toString());
        Integer num4 = 1;
        overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), num4.toString());
        Integer num5 = 3;
        overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), num5.toString());
        overridingProps().put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), "10");
        overridingProps().put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupId$1$adapted(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        $anonfun$testWithGroupId$1(kafkaProducer, str, kafkaConsumer);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupMetadata$1$adapted(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        $anonfun$testWithGroupMetadata$1(kafkaProducer, str, kafkaConsumer);
        return BoxedUnit.UNIT;
    }
}
