package kafka.api;

import java.util.Collections;
import java.util.concurrent.Future;
import kafka.log.AbstractLog;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ConsumerRebootstrapTest.scala */
@ScalaSignature(bytes = "\u0006\u0005=2A\u0001B\u0003\u0001\u0015!)q\u0002\u0001C\u0001!!)!\u0003\u0001C\u0001'!)a\u0005\u0001C\u0005O\t92i\u001c8tk6,'OU3c_>$8\u000f\u001e:baR+7\u000f\u001e\u0006\u0003\r\u001d\t1!\u00199j\u0015\u0005A\u0011!B6bM.\f7\u0001A\n\u0003\u0001-\u0001\"\u0001D\u0007\u000e\u0003\u0015I!AD\u0003\u0003\u001fI+'m\\8ugR\u0014\u0018\r\u001d+fgR\fa\u0001P5oSRtD#A\t\u0011\u00051\u0001\u0011a\u0004;fgR\u0014VMY8piN$(/\u00199\u0015\u0003Q\u0001\"!\u0006\r\u000e\u0003YQ\u0011aF\u0001\u0006g\u000e\fG.Y\u0005\u00033Y\u0011A!\u00168ji\"\u0012!a\u0007\t\u00039\u0011j\u0011!\b\u0006\u0003\ryQ!a\b\u0011\u0002\u000f),\b/\u001b;fe*\u0011\u0011EI\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002G\u0005\u0019qN]4\n\u0005\u0015j\"\u0001\u0002+fgR\f1b]3oIJ+7m\u001c:egR\u0019A\u0003K\u0017\t\u000b%\u001a\u0001\u0019\u0001\u0016\u0002\u00159,XNU3d_J$7\u000f\u0005\u0002\u0016W%\u0011AF\u0006\u0002\u0004\u0013:$\b\"\u0002\u0018\u0004\u0001\u0004Q\u0013\u0001\u00024s_6\u0004")
/* loaded from: input_file:kafka/api/ConsumerRebootstrapTest.class */
public class ConsumerRebootstrapTest extends RebootstrapTest {
    @Test
    public void testRebootstrap() {
        sendRecords(10, 0);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRebootstrap$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timeout waiting for records to be replicated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        server1().shutdown();
        server1().awaitShutdown();
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), clientOverrides(), createConsumer$default$4());
        createConsumer.assign(Collections.singleton(tp()));
        consumeAndVerifyRecords(createConsumer, 10, 0, consumeAndVerifyRecords$default$4(), consumeAndVerifyRecords$default$5(), consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8());
        server1().startup();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testRebootstrap$3(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("Timeout waiting for records to be replicated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        server0().shutdown();
        server0().awaitShutdown();
        sendRecords(10, 10);
        consumeAndVerifyRecords(createConsumer, 10, 10, 10, 10L, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8());
        server0().startup();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testRebootstrap$5(this)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 15000) {
                Assertions.fail("Timeout waiting for records to be replicated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        server1().shutdown();
        server1().awaitShutdown();
        sendRecords(10, 20);
        consumeAndVerifyRecords(createConsumer, 10, 20, 20, 20L, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8());
    }

    private void sendRecords(int i, int i2) {
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(i2), i + i2).foreach(obj -> {
            return $anonfun$sendRecords$1(this, createProducer, BoxesRunTime.unboxToInt(obj));
        });
        createProducer.flush();
        createProducer.close();
    }

    public static final /* synthetic */ boolean $anonfun$testRebootstrap$1(ConsumerRebootstrapTest consumerRebootstrapTest) {
        return ((AbstractLog) consumerRebootstrapTest.server0().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset() == ((AbstractLog) consumerRebootstrapTest.server1().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$testRebootstrap$2() {
        return "Timeout waiting for records to be replicated";
    }

    public static final /* synthetic */ boolean $anonfun$testRebootstrap$3(ConsumerRebootstrapTest consumerRebootstrapTest) {
        return ((AbstractLog) consumerRebootstrapTest.server0().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset() == ((AbstractLog) consumerRebootstrapTest.server1().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$testRebootstrap$4() {
        return "Timeout waiting for records to be replicated";
    }

    public static final /* synthetic */ boolean $anonfun$testRebootstrap$5(ConsumerRebootstrapTest consumerRebootstrapTest) {
        return ((AbstractLog) consumerRebootstrapTest.server0().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset() == ((AbstractLog) consumerRebootstrapTest.server1().logManager().logsByTopic(consumerRebootstrapTest.tp().topic()).head()).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$testRebootstrap$6() {
        return "Timeout waiting for records to be replicated";
    }

    public static final /* synthetic */ Future $anonfun$sendRecords$1(ConsumerRebootstrapTest consumerRebootstrapTest, KafkaProducer kafkaProducer, int i) {
        return kafkaProducer.send(new ProducerRecord(consumerRebootstrapTest.tp().topic(), Predef$.MODULE$.int2Integer(consumerRebootstrapTest.tp().partition()), Predef$.MODULE$.long2Long(i), new StringBuilder(4).append("key ").append(i).toString().getBytes(), new StringBuilder(6).append("value ").append(i).toString().getBytes()));
    }
}
