package kafka.network;

import java.util.Properties;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.server.BaseRequestTest;
import kafka.server.ControllerServer;
import kafka.server.Defaults$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: DynamicNumNetworkThreadsTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005ed\u0001B\n\u0015\u0001eAQ\u0001\t\u0001\u0005\u0002\u0005BQ\u0001\n\u0001\u0005B\u0015Bq\u0001\f\u0001C\u0002\u0013\u0005Q\u0006\u0003\u00047\u0001\u0001\u0006IA\f\u0005\bo\u0001\u0011\r\u0011\"\u0001.\u0011\u0019A\u0004\u0001)A\u0005]!I\u0011\b\u0001a\u0001\u0002\u0004%\tA\u000f\u0005\n\u000f\u0002\u0001\r\u00111A\u0005\u0002!C\u0011B\u0014\u0001A\u0002\u0003\u0005\u000b\u0015B\u001e\t\u000b=\u0003A\u0011\t)\t\u000be\u0003A\u0011\t.\t\u000b-\u0004A\u0011\t7\t\u000bE\u0004A\u0011\u0001:\t\r}\u0004A\u0011AA\u0001\u0011\u001d\t)\u0004\u0001C\u0005\u0003oAq!a\u0012\u0001\t\u0013\tI\u0005C\u0004\u0002L\u0001!I!!\u0014\t\u0013\u0005\u0005\u0004!%A\u0005\n\u0005\r$\u0001\b#z]\u0006l\u0017n\u0019(v[:+Go^8sWRC'/Z1egR+7\u000f\u001e\u0006\u0003+Y\tqA\\3uo>\u00148NC\u0001\u0018\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000e\u0011\u0005mqR\"\u0001\u000f\u000b\u0005u1\u0012AB:feZ,'/\u0003\u0002 9\ty!)Y:f%\u0016\fX/Z:u)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002EA\u00111\u0005A\u0007\u0002)\u0005Y!M]8lKJ\u001cu.\u001e8u+\u00051\u0003CA\u0014+\u001b\u0005A#\"A\u0015\u0002\u000bM\u001c\u0017\r\\1\n\u0005-B#aA%oi\u0006A\u0011N\u001c;fe:\fG.F\u0001/!\tyC'D\u00011\u0015\t\t$'\u0001\u0003mC:<'\"A\u001a\u0002\t)\fg/Y\u0005\u0003kA\u0012aa\u0015;sS:<\u0017!C5oi\u0016\u0014h.\u00197!\u0003!)\u0007\u0010^3s]\u0006d\u0017!C3yi\u0016\u0014h.\u00197!\u0003\u0015\tG-\\5o+\u0005Y\u0004C\u0001\u001fF\u001b\u0005i$BA\u001d?\u0015\ty\u0004)A\u0004dY&,g\u000e^:\u000b\u0005]\t%B\u0001\"D\u0003\u0019\t\u0007/Y2iK*\tA)A\u0002pe\u001eL!AR\u001f\u0003\u000b\u0005#W.\u001b8\u0002\u0013\u0005$W.\u001b8`I\u0015\fHCA%M!\t9#*\u0003\u0002LQ\t!QK\\5u\u0011\u001di\u0005\"!AA\u0002m\n1\u0001\u001f\u00132\u0003\u0019\tG-\\5oA\u00059\"M]8lKJ\u0004&o\u001c9feRLxJ^3se&$Wm\u001d\u000b\u0003\u0013FCQA\u0015\u0006A\u0002M\u000b!\u0002\u001d:pa\u0016\u0014H/[3t!\t!v+D\u0001V\u0015\t1&'\u0001\u0003vi&d\u0017B\u0001-V\u0005)\u0001&o\u001c9feRLWm]\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0003\u0013nCQ\u0001X\u0006A\u0002u\u000b\u0001\u0002^3ti&sgm\u001c\t\u0003=\u0016l\u0011a\u0018\u0006\u0003A\u0006\f1!\u00199j\u0015\t\u00117-A\u0004kkBLG/\u001a:\u000b\u0005\u0011\u001c\u0015!\u00026v]&$\u0018B\u00014`\u0005!!Vm\u001d;J]\u001a|\u0007FA\u0006i!\tq\u0016.\u0003\u0002k?\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u0011Q,\u0017M\u001d#po:$\u0012!\u0013\u0015\u0003\u00199\u0004\"AX8\n\u0005A|&!C!gi\u0016\u0014X)Y2i\u0003Q9W\r\u001e(v[:+Go^8sWRC'/Z1egR\u0011ae\u001d\u0005\u0006i6\u0001\r!^\u0001\tY&\u001cH/\u001a8feB\u0011a/ \b\u0003on\u0004\"\u0001\u001f\u0015\u000e\u0003eT!A\u001f\r\u0002\rq\u0012xn\u001c;?\u0013\ta\b&\u0001\u0004Qe\u0016$WMZ\u0005\u0003kyT!\u0001 \u0015\u00029Q,7\u000f\u001e#z]\u0006l\u0017n\u0019(v[:+Go^8sWRC'/Z1egR\u0019\u0011*a\u0001\t\r\u0005\u0015a\u00021\u0001v\u0003\u0019\tXo\u001c:v[\":a\"!\u0003\u0002\u0016\u0005]\u0001\u0003BA\u0006\u0003#i!!!\u0004\u000b\u0007\u0005=\u0011-\u0001\u0004qCJ\fWn]\u0005\u0005\u0003'\tiAA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\u0012\u0011\u0011D\u0001#w\u0012L7\u000f\u001d7bs:\u000bW.Z?/w\u0006\u0014x-^7f]R\u001cx+\u001b;i\u001d\u0006lWm]?)\u000f9\ti\"!\u000b\u0002,A!\u0011qDA\u0013\u001b\t\t\tC\u0003\u0003\u0002$\u00055\u0011\u0001\u00039s_ZLG-\u001a:\n\t\u0005\u001d\u0012\u0011\u0005\u0002\f-\u0006dW/Z*pkJ\u001cW-A\u0004tiJLgnZ:-\t\u00055\u0012\u0011G\u0011\u0003\u0003_\t!A_6\"\u0005\u0005M\u0012!B6sC\u001a$\u0018A\u0005:fG>tg-[4ve\u0016\u001cVM\u001d<feN$R!SA\u001d\u0003{Aa!a\u000f\u0010\u0001\u0004\u0019\u0016\u0001\u00038foB\u0013x\u000e]:\t\u000f\u0005}r\u00021\u0001\u0002B\u0005i\u0011\r\u0015:paR{g+\u001a:jMf\u0004RaJA\"kVL1!!\u0012)\u0005\u0019!V\u000f\u001d7fe\u0005\t2M]3bi\u0016\fE-\\5o\u00072LWM\u001c;\u0015\u0003m\nQc^1ji\u001a{'oQ8oM&<wJ\\*feZ,'\u000fF\u0004J\u0003\u001f\n\u0019&a\u0016\t\r\u0005E\u0013\u00031\u0001v\u0003!\u0001(o\u001c9OC6,\u0007BBA+#\u0001\u0007Q/A\u0005qe>\u0004h+\u00197vK\"I\u0011\u0011L\t\u0011\u0002\u0003\u0007\u00111L\u0001\n[\u0006Dx+Y5u\u001bN\u00042aJA/\u0013\r\ty\u0006\u000b\u0002\u0005\u0019>tw-A\u0010xC&$hi\u001c:D_:4\u0017nZ(o'\u0016\u0014h/\u001a:%I\u00164\u0017-\u001e7uIM*\"!!\u001a+\t\u0005m\u0013qM\u0016\u0003\u0003S\u0002B!a\u001b\u0002v5\u0011\u0011Q\u000e\u0006\u0005\u0003_\n\t(A\u0005v]\u000eDWmY6fI*\u0019\u00111\u000f\u0015\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002x\u00055$!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0002")
/* loaded from: input_file:kafka/network/DynamicNumNetworkThreadsTest.class */
public class DynamicNumNetworkThreadsTest extends BaseRequestTest {
    private final String internal = "PLAINTEXT";
    private final String external = "EXTERNAL";
    private Admin admin;

    @Override // kafka.server.BaseRequestTest, kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 1;
    }

    public String internal() {
        return this.internal;
    }

    public String external() {
        return this.external;
    }

    public Admin admin() {
        return this.admin;
    }

    public void admin_$eq(Admin admin) {
        this.admin = admin;
    }

    @Override // kafka.server.BaseRequestTest
    public void brokerPropertyOverrides(Properties properties) {
        properties.put(KafkaConfig$.MODULE$.ListenersProp(), new StringBuilder(30).append(internal()).append("://localhost:0, ").append(external()).append("://localhost:0").toString());
        properties.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), new StringBuilder(22).append(internal()).append(":PLAINTEXT, ").append(external()).append(":PLAINTEXT").toString());
        properties.put(new StringBuilder(15).append("listener.name.").append(internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), "2");
        properties.put(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), Integer.toString(Defaults$.MODULE$.NumNetworkThreads()));
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers = brokers();
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        admin_$eq(testUtils$.createAdminClient((Seq) brokers, forSecurityProtocol, new Properties()));
        Assertions.assertEquals(2, getNumNetworkThreads(internal()));
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        Admin admin = admin();
        Buffer<KafkaBroker> brokers2 = brokers();
        Seq<ControllerServer> controllerServers = controllerServers();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        Map<Object, Seq<Object>> map = (Map) Map$.MODULE$.empty();
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        testUtils$3.createTopicWithAdmin(admin, RemoteLogReaderTest.TOPIC, brokers2, controllerServers, 1, 1, map, new Properties());
        Assertions.assertEquals(Defaults$.MODULE$.NumNetworkThreads(), getNumNetworkThreads(external()));
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @AfterEach
    public void tearDown() {
        if (admin() != null) {
            admin().close();
        }
        super.tearDown();
    }

    public int getNumNetworkThreads(String str) {
        return ((IterableOnceOps) CollectionConverters$.MODULE$.SetHasAsScala(((KafkaBroker) brokers().head()).metrics().metrics().keySet()).asScala().filter(metricName -> {
            return BoxesRunTime.boxToBoolean($anonfun$getNumNetworkThreads$1(metricName));
        })).count(metricName2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$getNumNetworkThreads$2(str, metricName2));
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.{argumentsWithNames}")
    public void testDynamicNumNetworkThreads(String str) {
        int NumNetworkThreads = Defaults$.MODULE$.NumNetworkThreads() + 1;
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), Integer.toString(NumNetworkThreads));
        reconfigureServers(properties, new Tuple2<>(KafkaConfig$.MODULE$.NumNetworkThreadsProp(), Integer.toString(NumNetworkThreads)));
        Assertions.assertEquals(2, getNumNetworkThreads(internal()));
        Assertions.assertEquals(NumNetworkThreads, getNumNetworkThreads(external()));
        Properties properties2 = new Properties();
        properties2.put(new StringBuilder(15).append("listener.name.").append(internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), Integer.toString(3));
        reconfigureServers(properties2, new Tuple2<>(new StringBuilder(15).append("listener.name.").append(internal().toLowerCase()).append(".").append(KafkaConfig$.MODULE$.NumNetworkThreadsProp()).toString(), Integer.toString(3)));
        Assertions.assertEquals(3, getNumNetworkThreads(internal()));
        Assertions.assertEquals(NumNetworkThreads, getNumNetworkThreads(external()));
    }

    private void reconfigureServers(Properties properties, Tuple2<String, String> tuple2) {
        Admin createAdminClient = createAdminClient();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        Buffer<KafkaBroker> brokers = brokers();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        testUtils$.incrementalAlterConfigs(brokers, createAdminClient, properties, false, AlterConfigOp.OpType.SET).all().get();
        waitForConfigOnServer((String) tuple2._1(), (String) tuple2._2(), 10000L);
        createAdminClient.close();
    }

    private Admin createAdminClient() {
        String bootstrapServers = TestUtils$.MODULE$.bootstrapServers(brokers(), new ListenerName(securityProtocol().name));
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("metadata.max.age.ms", "10");
        return Admin.create(properties);
    }

    private void waitForConfigOnServer(String str, String str2, long j) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j2 = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$waitForConfigOnServer$1(this, str2, str);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > j) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j2).append(", and then retrying.").toString()));
                }
                Thread.sleep(j2);
                j2 += package$.MODULE$.min(j2, 1000L);
            }
        }
    }

    private long waitForConfigOnServer$default$3() {
        return 10000L;
    }

    public static final /* synthetic */ boolean $anonfun$getNumNetworkThreads$1(MetricName metricName) {
        String name = metricName.name();
        return name != null && name.equals("request-rate");
    }

    public static final /* synthetic */ boolean $anonfun$getNumNetworkThreads$2(String str, MetricName metricName) {
        Object obj = metricName.tags().get("listener");
        return str == null ? obj == null : str.equals(obj);
    }

    public static final /* synthetic */ void $anonfun$waitForConfigOnServer$1(DynamicNumNetworkThreadsTest dynamicNumNetworkThreadsTest, String str, String str2) {
        Assertions.assertEquals(str, ((KafkaBroker) dynamicNumNetworkThreadsTest.brokers().head()).config().originals().get(str2));
    }
}
