package kafka.server;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.PartitionListener;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opentest4j.AssertionFailedError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.Nothing$;
import scala.runtime.RichLong$;
import scala.runtime.VolatileLongRef;

/* compiled from: ProduceRequestPipeliningTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005c\u0001B\n\u0015\u0001eAQ\u0001\t\u0001\u0005\u0002\u0005BQ\u0001\n\u0001\u0005R\u0015Bq\u0001\f\u0001C\u0002\u0013\u0005S\u0006\u0003\u00047\u0001\u0001\u0006IA\f\u0005\bo\u0001\u0011\r\u0011\"\u00039\u0011\u0019y\u0004\u0001)A\u0005s!9\u0001\t\u0001a\u0001\n\u0013\t\u0005bB#\u0001\u0001\u0004%IA\u0012\u0005\u0007\u0019\u0002\u0001\u000b\u0015\u0002\"\t\u000f5\u0003\u0001\u0019!C\u0005\u0003\"9a\n\u0001a\u0001\n\u0013y\u0005BB)\u0001A\u0003&!\tC\u0003S\u0001\u0011\u00051\u000bC\u0003~\u0001\u0011\u0005a\u0010C\u0004\u0002\b\u0001!\t!!\u0003\t\u000f\u0005M\u0001\u0001\"\u0003\u0002\u0016!9\u00111\b\u0001\u0005\n\u0005u\u0002bBA \u0001\u0011%\u0011Q\b\u0002\u001d!J|G-^2f%\u0016\fX/Z:u!&\u0004X\r\\5oS:<G+Z:u\u0015\t)b#\u0001\u0004tKJ4XM\u001d\u0006\u0002/\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u001b!\tYb$D\u0001\u001d\u0015\tib#A\u0002ba&L!a\b\u000f\u0003-%sG/Z4sCRLwN\u001c+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#\u0001\u0012\u0011\u0005\r\u0002Q\"\u0001\u000b\u0002\u0017\t\u0014xn[3s\u0007>,h\u000e^\u000b\u0002MA\u0011qEK\u0007\u0002Q)\t\u0011&A\u0003tG\u0006d\u0017-\u0003\u0002,Q\t\u0019\u0011J\u001c;\u0002\u0019M,'O^3s\u0007>tg-[4\u0016\u00039\u0002\"a\f\u001b\u000e\u0003AR!!\r\u001a\u0002\tU$\u0018\u000e\u001c\u0006\u0002g\u0005!!.\u0019<b\u0013\t)\u0004G\u0001\u0006Qe>\u0004XM\u001d;jKN\fQb]3sm\u0016\u00148i\u001c8gS\u001e\u0004\u0013!\u0003;pa&\u001cg*Y7f+\u0005I\u0004C\u0001\u001e>\u001b\u0005Y$B\u0001\u001f3\u0003\u0011a\u0017M\\4\n\u0005yZ$AB*ue&tw-\u0001\u0006u_BL7MT1nK\u0002\nA\u0002\\3bI\u0016\u0014(I]8lKJ,\u0012A\u0011\t\u0003G\rK!\u0001\u0012\u000b\u0003\u0017-\u000bgm[1Ce>\\WM]\u0001\u0011Y\u0016\fG-\u001a:Ce>\\WM]0%KF$\"a\u0012&\u0011\u0005\u001dB\u0015BA%)\u0005\u0011)f.\u001b;\t\u000f-C\u0011\u0011!a\u0001\u0005\u0006\u0019\u0001\u0010J\u0019\u0002\u001b1,\u0017\rZ3s\u0005J|7.\u001a:!\u000391w\u000e\u001c7po\u0016\u0014(I]8lKJ\f!CZ8mY><XM\u001d\"s_.,'o\u0018\u0013fcR\u0011q\t\u0015\u0005\b\u0017.\t\t\u00111\u0001C\u0003=1w\u000e\u001c7po\u0016\u0014(I]8lKJ\u0004\u0013\u0001\b;fgR\u0004&o\u001c3vG\u0016\u0014V-];fgR\u0004\u0016\u000e]3mS:Lgn\u001a\u000b\u0003\u000fRCQ!V\u0007A\u0002Y\u000ba!];peVl\u0007CA,_\u001d\tAF\f\u0005\u0002ZQ5\t!L\u0003\u0002\\1\u00051AH]8pizJ!!\u0018\u0015\u0002\rA\u0013X\rZ3g\u0013\tqtL\u0003\u0002^Q!\"Q\"Y8q!\t\u0011W.D\u0001d\u0015\t!W-\u0001\u0005qe>4\u0018\u000eZ3s\u0015\t1w-\u0001\u0004qCJ\fWn\u001d\u0006\u0003Q&\fqA[;qSR,'O\u0003\u0002kW\u0006)!.\u001e8ji*\tA.A\u0002pe\u001eL!A\\2\u0003\u0017Y\u000bG.^3T_V\u00148-Z\u0001\bgR\u0014\u0018N\\4tY\t\t8/I\u0001s\u0003\tQ8.I\u0001u\u0003\u0015Y'/\u00194uQ\u0011iaO_>\u0011\u0005]DX\"A3\n\u0005e,'!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u0006!a.Y7fC\u0005a\u0018AI>eSN\u0004H.Y=OC6,WPL>be\u001e,X.\u001a8ug^KG\u000f\u001b(b[\u0016\u001cX0\u0001\u0016uKN$\bK]8ek\u000e,'+Z9vKN$\b+\u001b9fY&t\u0017N\\4XSRDG\u000b\u001b:piRd\u0017N\\4\u0015\u0005\u001d{\b\"B+\u000f\u0001\u00041\u0006&\u0002\bb_\u0006\rAFA9tQ\u0011qaO_>\u0002GQ,7\u000f\u001e#z]\u0006l\u0017n\u0019)s_\u0012,8-\u001a*fcV,7\u000f\u001e)ja\u0016d\u0017N\\5oOR\u0019q)a\u0003\t\u000bU{\u0001\u0019\u0001,)\u000b=\tw.a\u0004-\u0005E\u001c\b\u0006B\bwun\f1%\u00197uKJ\u0014V-];fgR\u0004\u0016\u000e]3mS:Lgn\u001a#z]\u0006l\u0017nY\"p]\u001aLw\rF\u0003H\u0003/\t\t\u0004C\u0004\u0002\u001aA\u0001\r!a\u0007\u0002\u0017\u0005$W.\u001b8DY&,g\u000e\u001e\t\u0005\u0003;\ti#\u0004\u0002\u0002 )!\u0011\u0011EA\u0012\u0003\u0015\tG-\\5o\u0015\u0011\t)#a\n\u0002\u000f\rd\u0017.\u001a8ug*\u0019q#!\u000b\u000b\u0007\u0005-2.\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0005\u0003_\tyBA\u0003BI6Lg\u000eC\u0004\u00024A\u0001\r!!\u000e\u0002\rU\u0004H-\u0019;f!\r9\u0013qG\u0005\u0004\u0003sA#a\u0002\"p_2,\u0017M\\\u0001\raJ,\u0007/\u0019:f)>\u0004\u0018n\u0019\u000b\u0002\u000f\u0006qb/\u001a:jMf\u0004&o\u001c3vG\u0016\u0014V-];fgR\u0004\u0016\u000e]3mS:Lgn\u001a")
/* loaded from: input_file:kafka/server/ProduceRequestPipeliningTest.class */
public class ProduceRequestPipeliningTest extends IntegrationTestHarness {
    private final Properties serverConfig;
    private final String topicName;
    private KafkaBroker leaderBroker;
    private KafkaBroker followerBroker;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 2;
    }

    @Override // kafka.api.IntegrationTestHarness
    public Properties serverConfig() {
        return this.serverConfig;
    }

    private String topicName() {
        return this.topicName;
    }

    private KafkaBroker leaderBroker() {
        return this.leaderBroker;
    }

    private void leaderBroker_$eq(KafkaBroker kafkaBroker) {
        this.leaderBroker = kafkaBroker;
    }

    private KafkaBroker followerBroker() {
        return this.followerBroker;
    }

    private void followerBroker_$eq(KafkaBroker kafkaBroker) {
        this.followerBroker = kafkaBroker;
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testProduceRequestPipelining(String str) {
        prepareTopic();
        verifyProduceRequestPipelining();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testProduceRequestPipeliningWithThrottling(String str) {
        prepareTopic();
        createAdminClient(createAdminClient$default$1(), createAdminClient$default$2()).alterClientQuotas((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new ClientQuotaAlteration(new ClientQuotaEntity((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), (Object) null), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("client-id"), (Object) null)}))).asJava()), (Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new ClientQuotaAlteration.Op("producer_byte_rate", Predef$.MODULE$.double2Double(10.0d)), new $colon.colon(new ClientQuotaAlteration.Op("consumer_byte_rate", Predef$.MODULE$.double2Double(10.0d)), new $colon.colon(new ClientQuotaAlteration.Op("request_percentage", Predef$.MODULE$.double2Double(Long.MAX_VALUE)), Nil$.MODULE$)))).asJava()), Nil$.MODULE$)).asJava()).all().get();
        ensureConsistentKRaftMetadata();
        Assertions.assertEquals("Timed out waiting for local write for second record to complete.", Assertions.assertThrows(AssertionFailedError.class, () -> {
            this.verifyProduceRequestPipelining();
        }).getMessage());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testDynamicProduceRequestPipelining(String str) {
        prepareTopic();
        verifyProduceRequestPipelining();
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        alterRequestPipeliningDynamicConfig(createAdminClient, false);
        Assertions.assertEquals("Timed out waiting for local write for second record to complete.", Assertions.assertThrows(AssertionFailedError.class, () -> {
            this.verifyProduceRequestPipelining();
        }).getMessage());
        alterRequestPipeliningDynamicConfig(createAdminClient, true);
        verifyProduceRequestPipelining();
    }

    private void alterRequestPipeliningDynamicConfig(Admin admin, boolean z) {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.RequestPipeliningEnableProp(), Boolean.toString(z));
        TestUtils$.MODULE$.incrementalAlterConfigs(brokers(), admin, properties, false, TestUtils$.MODULE$.incrementalAlterConfigs$default$5()).all().get();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$alterRequestPipeliningDynamicConfig$1(this, z)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$alterRequestPipeliningDynamicConfig$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void prepareTopic() {
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "2");
        scala.collection.immutable.Map<Object, Object> createTopic = createTopic("test_topic", 1, brokerCount(), properties, createTopic$default$5(), createTopic$default$6());
        ensureConsistentKRaftMetadata();
        leaderBroker_$eq(brokerWithId(Predef$.MODULE$.Integer2int((Integer) brokerIds().find(num -> {
            return BoxesRunTime.boxToBoolean($anonfun$prepareTopic$1(createTopic, num));
        }).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected to find leader broker");
        }))));
        followerBroker_$eq(brokerWithId(Predef$.MODULE$.Integer2int((Integer) brokerIds().find(num2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$prepareTopic$3(createTopic, num2));
        }).getOrElse(() -> {
            return (Nothing$) Assertions.fail("Expected to find a follower brokers");
        }))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void verifyProduceRequestPipelining() {
        final VolatileLongRef create = VolatileLongRef.create(-1L);
        final VolatileLongRef create2 = VolatileLongRef.create(-1L);
        final ProduceRequestPipeliningTest produceRequestPipeliningTest = null;
        leaderBroker().replicaManager().maybeAddListener(new TopicPartition(topicName(), 0), new PartitionListener(produceRequestPipeliningTest, create, create2) { // from class: kafka.server.ProduceRequestPipeliningTest$$anon$1
            private final VolatileLongRef partitionLogEndOffset$1;
            private final VolatileLongRef partitionHighWaterMark$1;

            public void onStartOffsetUpdated(TopicPartition topicPartition, long j) {
                PartitionListener.onStartOffsetUpdated$(this, topicPartition, j);
            }

            public void onLastStableOffsetUpdated(TopicPartition topicPartition, long j) {
                PartitionListener.onLastStableOffsetUpdated$(this, topicPartition, j);
            }

            public void onIsrUpdated(TopicPartition topicPartition, Set<Object> set) {
                PartitionListener.onIsrUpdated$(this, topicPartition, set);
            }

            public void onLeaderEpochUpdated(TopicPartition topicPartition, int i) {
                PartitionListener.onLeaderEpochUpdated$(this, topicPartition, i);
            }

            public void onFailed(TopicPartition topicPartition) {
                PartitionListener.onFailed$(this, topicPartition);
            }

            public void onDeleted(TopicPartition topicPartition) {
                PartitionListener.onDeleted$(this, topicPartition);
            }

            public void onEndOffsetUpdated(TopicPartition topicPartition, long j) {
                this.partitionLogEndOffset$1.elem = j;
            }

            public void onHighWatermarkUpdated(TopicPartition topicPartition, long j) {
                this.partitionHighWaterMark$1.elem = j;
            }

            {
                this.partitionLogEndOffset$1 = create;
                this.partitionHighWaterMark$1 = create2;
                PartitionListener.$init$(this);
            }
        });
        long j = create.elem;
        long j2 = create2.elem;
        Assertions.assertEquals(j2, j);
        Properties properties = new Properties();
        properties.put("acks", "all");
        properties.put("enable.idempotence", "false");
        KafkaProducer createProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), properties);
        byte[] bArr = new byte[34];
        byte[] bArr2 = new byte[33];
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        CoreUtils$.MODULE$.inLock((ReentrantLock) TestUtils.fieldValue((ReplicaFetcherThread) followerBroker().replicaManager().replicaFetcherManager().getFetcher(new TopicPartition(topicName(), 0)).get(), AbstractFetcherThread.class, "partitionMapLock"), () -> {
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(this.topicName(), Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequestPipelining$2(create, j)) {
                if (System.currentTimeMillis() > currentTimeMillis + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequestPipelining$3());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$4));
            }
            Assertions.assertEquals(j2, create2.elem);
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(this.topicName(), Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$2 == null) {
                throw null;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequestPipelining$4(create, j)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequestPipelining$5());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$42));
            }
            Assertions.assertEquals(j2, create2.elem);
            arrayBuffer.$plus$eq(createProducer.send(new ProducerRecord(this.topicName(), Predef$.MODULE$.int2Integer(0), bArr, bArr2)));
            TestUtils$ testUtils$3 = TestUtils$.MODULE$;
            long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$3 == null) {
                throw null;
            }
            long currentTimeMillis3 = System.currentTimeMillis();
            while (!$anonfun$verifyProduceRequestPipelining$6(create, j)) {
                if (System.currentTimeMillis() > currentTimeMillis3 + 5000) {
                    Assertions.fail($anonfun$verifyProduceRequestPipelining$7());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(5000L), waitUntilTrue$default$43));
            }
            Assertions.assertEquals(j2, create2.elem);
        });
        ArrayBuffer arrayBuffer2 = (ArrayBuffer) arrayBuffer.map(future -> {
            return (RecordMetadata) future.get(5000L, TimeUnit.MILLISECONDS);
        }, ArrayBuffer$.MODULE$.canBuildFrom());
        Assertions.assertEquals(j + 3, create2.elem);
        Assertions.assertEquals(j + 3, create.elem);
        IntRef create3 = IntRef.create(0);
        arrayBuffer2.foreach(recordMetadata -> {
            $anonfun$verifyProduceRequestPipelining$9(j, create3, recordMetadata);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$alterRequestPipeliningDynamicConfig$2(boolean z, KafkaBroker kafkaBroker) {
        return BoxesRunTime.equals(kafkaBroker.config().getBoolean(KafkaConfig$.MODULE$.RequestPipeliningEnableProp()), BoxesRunTime.boxToBoolean(z));
    }

    public static final /* synthetic */ boolean $anonfun$alterRequestPipeliningDynamicConfig$1(ProduceRequestPipeliningTest produceRequestPipeliningTest, boolean z) {
        return produceRequestPipeliningTest.brokers().forall(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$alterRequestPipeliningDynamicConfig$2(z, kafkaBroker));
        });
    }

    public static final /* synthetic */ String $anonfun$alterRequestPipeliningDynamicConfig$3() {
        return "Timed out waiting for brokers to receive updated request pipelining config";
    }

    public static final /* synthetic */ boolean $anonfun$prepareTopic$1(scala.collection.immutable.Map map, Integer num) {
        return BoxesRunTime.equalsNumObject(num, map.apply(BoxesRunTime.boxToInteger(0)));
    }

    public static final /* synthetic */ boolean $anonfun$prepareTopic$3(scala.collection.immutable.Map map, Integer num) {
        return !BoxesRunTime.equalsNumObject(num, map.apply(BoxesRunTime.boxToInteger(0)));
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequestPipelining$2(VolatileLongRef volatileLongRef, long j) {
        return volatileLongRef.elem == j + 1;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequestPipelining$3() {
        return "Timed out waiting for local write for first record to complete.";
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequestPipelining$4(VolatileLongRef volatileLongRef, long j) {
        return volatileLongRef.elem == j + 2;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequestPipelining$5() {
        return "Timed out waiting for local write for second record to complete.";
    }

    public static final /* synthetic */ boolean $anonfun$verifyProduceRequestPipelining$6(VolatileLongRef volatileLongRef, long j) {
        return volatileLongRef.elem == j + 3;
    }

    public static final /* synthetic */ String $anonfun$verifyProduceRequestPipelining$7() {
        return "Timed out waiting for local write for third record to complete.";
    }

    public static final /* synthetic */ void $anonfun$verifyProduceRequestPipelining$9(long j, IntRef intRef, RecordMetadata recordMetadata) {
        Assertions.assertEquals(j + intRef.elem, recordMetadata.offset());
        intRef.elem++;
    }

    public ProduceRequestPipeliningTest() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), Long.toString(TimeUnit.MINUTES.toMillis(5L)));
        this.serverConfig = properties;
        this.topicName = "test_topic";
        this.leaderBroker = null;
        this.followerBroker = null;
    }
}
