package kafka.link;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import kafka.server.AbstractFetcherManager;
import kafka.server.DynamicConfig$Client$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.LeaderEndPoint;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcher;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkLeaderEndPoint;
import kafka.server.link.ClusterLinkLeaderRequestBuilder;
import kafka.server.link.FetchResponseSize;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.HashMap;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkQuotaIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u0005Ed\u0001B\u0006\r\u0001EAQA\u0006\u0001\u0005\u0002]AQ!\u0007\u0001\u0005\u0002iAQA\u0014\u0001\u0005\u0002=CQ!\u0016\u0001\u0005\u0002YCQ\u0001\u0018\u0001\u0005\u0002uCQa\u0019\u0001\u0005\u0002\u0011DQA\u001b\u0001\u0005\u0002-Dq!!\u000e\u0001\t\u0003\t9\u0004C\u0004\u0002<\u0001!I!!\u0010\t\u000f\u0005}\u0002\u0001\"\u0003\u0002B\ty2\t\\;ti\u0016\u0014H*\u001b8l#V|G/Y%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u00055q\u0011\u0001\u00027j].T\u0011aD\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001!\u0003\u0005\u0002\u0014)5\tA\"\u0003\u0002\u0016\u0019\t\u0011\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW&sG/Z4sCRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001\r\u0011\u0005M\u0001\u0011A\u000b;fgR$Um\u001d;j]\u0006$\u0018n\u001c8DYV\u001cH/\u001a:MS:\\'I]8lKJdUM^3m#V|G/\u0019\u000b\u00047\u0005r\u0003C\u0001\u000f \u001b\u0005i\"\"\u0001\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0001j\"\u0001B+oSRDQA\t\u0002A\u0002\r\na!];peVl\u0007C\u0001\u0013,\u001d\t)\u0013\u0006\u0005\u0002';5\tqE\u0003\u0002)!\u00051AH]8pizJ!AK\u000f\u0002\rA\u0013X\rZ3g\u0013\taSF\u0001\u0004TiJLgn\u001a\u0006\u0003UuAQa\f\u0002A\u0002A\n1bY8pe\u0012Lg.\u0019;peB\u0011A$M\u0005\u0003eu\u0011qAQ8pY\u0016\fg\u000e\u000b\u0003\u0003i\u0001\u000b\u0005CA\u001b?\u001b\u00051$BA\u001c9\u0003\u0019\u0001\u0018M]1ng*\u0011\u0011HO\u0001\bUV\u0004\u0018\u000e^3s\u0015\tYD(A\u0003kk:LGOC\u0001>\u0003\ry'oZ\u0005\u0003\u007fY\u0012\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0003\t\u000b\u0001f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\r|wN\u001d3j]\u0006$xN]\u001f|cuDCA\u0001#K\u0017B\u0011Q\tS\u0007\u0002\r*\u0011qIN\u0001\taJ|g/\u001b3fe&\u0011\u0011J\u0012\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0002\u0019\u0006\nQ*A\bbY2\u001cu.\u001c2j]\u0006$\u0018n\u001c8t\u0003}!Xm\u001d;EKN$\u0018N\\1uS>t7\t\\;ti\u0016\u0014H*\u001b8l#V|G/\u0019\u000b\u00047A\u000b\u0006\"\u0002\u0012\u0004\u0001\u0004\u0019\u0003\"B\u0018\u0004\u0001\u0004\u0001\u0004\u0006B\u00025\u0001\u0006CCa\u0001#K)2\nA*\u0001\u0019uKN$H)Z:uS:\fG/[8o\u00072,8\u000f^3s\u0019&t7.U;pi\u0006<\u0016\u000e\u001e5Ce>\\WM\u001d*fgR\f'\u000f\u001e\u000b\u00047]C\u0006\"\u0002\u0012\u0005\u0001\u0004\u0019\u0003\"B\u0018\u0005\u0001\u0004\u0001\u0004\u0006\u0002\u00035\u0001\u0006CC\u0001\u0002#K72\nA*A\u0013uKN$H)Z:uS:\fG/[8o\u0019\u0006<G*\u001b8l\r\u0016$8\r[3s)\"\u0014x\u000e\u001e;mKR\u00191DX0\t\u000b\t*\u0001\u0019A\u0012\t\u000b=*\u0001\u0019\u0001\u0019)\t\u0015!\u0004)\u0011\u0015\u0005\u000b\u0011S%\rL\u0001M\u0003Y!Xm\u001d;T_V\u00148-Z\"mkN$XM])v_R\fGcA\u000efM\")!E\u0002a\u0001G!)qF\u0002a\u0001a!\"a\u0001\u000e!BQ\u00111AIS5-\u00031\u000b\u0011E^3sS\u001aLH)Z:uS:\fG/[8o\u00072,8\u000f^3s\u0019&t7.U;pi\u0006$B\u0001\\;\u0002\bA\u0011Qn]\u0007\u0002]*\u0011q\u000e]\u0001\u0007G>lWn\u001c8\u000b\u0005=\t(B\u0001:=\u0003\u0019\t\u0007/Y2iK&\u0011AO\u001c\u0002\u0005+VLG\rC\u0003w\u000f\u0001\u0007q/A\u0005sKN|WO]2fgB\u0019\u0001p_?\u000e\u0003eT!A_\u000f\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002}s\n\u00191+Z9\u0011\u0007y\f\u0019!D\u0001��\u0015\r\t\tA\\\u0001\u0007G>tg-[4\n\u0007\u0005\u0015qP\u0001\bD_:4\u0017n\u001a*fg>,(oY3\t\u000f\u0005%q\u00011\u0001\u0002\f\u0005I\u0011/^8uC6{G-\u001a\t\u0005\u0003\u001b\tyC\u0004\u0003\u0002\u0010\u0005%b\u0002BA\t\u0003KqA!a\u0005\u0002$9!\u0011QCA\u0011\u001d\u0011\t9\"a\b\u000f\t\u0005e\u0011Q\u0004\b\u0004M\u0005m\u0011\"A\u001f\n\u0005Id\u0014BA\br\u0013\ty\u0007/C\u0002\u0002\u00029L1!a\n��\u0003%Ig\u000e^3s]\u0006d7/\u0003\u0003\u0002,\u00055\u0012\u0001E\"p]\u001adW/\u001a8u\u0007>tg-[4t\u0015\r\t9c`\u0005\u0005\u0003c\t\u0019D\u0001\u000bDYV\u001cH/\u001a:MS:\\\u0017+^8uC6{G-\u001a\u0006\u0005\u0003W\ti#A\bwKJLg-_)v_R\fWj\u001c3f)\rY\u0012\u0011\b\u0005\b\u0003\u0013A\u0001\u0019AA\u0006\u0003\u0001\"Wm\u001d;DYV\u001cH/\u001a:MS:\\'+\u001a9mS\u000e\f7\u000f\u00165s_R$H.\u001a3\u0015\u0003A\nqC^3sS\u001aLh)\u001a;dQJ+7\u000f]8og\u0016\u001c\u0016N_3\u0015\u000bm\t\u0019%a\u0012\t\r\u0005\u0015#\u00021\u0001m\u0003\u0019a\u0017N\\6JI\"9\u0011\u0011\n\u0006A\u0002\u0005-\u0013\u0001D3ya\u0016\u001cG/\u001a3TSj,\u0007#\u0002\u000f\u0002N\u0005E\u0013bAA(;\t1q\n\u001d;j_:\u0004B!a\u0015\u0002\\5\u0011\u0011Q\u000b\u0006\u0004\u001b\u0005]#bAA-\u001d\u000511/\u001a:wKJLA!!\u0018\u0002V\t\tb)\u001a;dQJ+7\u000f]8og\u0016\u001c\u0016N_3)\r\u0001\t\tGSA7!\u0011\t\u0019'!\u001b\u000e\u0005\u0005\u0015$bAA4q\u0005\u0019\u0011\r]5\n\t\u0005-\u0014Q\r\u0002\u0004)\u0006<\u0017EAA8\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8")
/* loaded from: input_file:kafka/link/ClusterLinkQuotaIntegrationTest.class */
public class ClusterLinkQuotaIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationClusterLinkBrokerLevelQuota(String str, boolean z) {
        verifyDestinationClusterLinkQuota(((IterableOnceOps) ((IterableOps) destCluster().brokers().map(kafkaBroker -> {
            return BoxesRunTime.boxToInteger($anonfun$testDestinationClusterLinkBrokerLevelQuota$1(kafkaBroker));
        })).map(obj -> {
            return $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(BoxesRunTime.unboxToInt(obj));
        })).toSeq(), ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY);
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationClusterLinkQuota(String str, boolean z) {
        $colon.colon colonVar = new $colon.colon(new ConfigResource(ConfigResource.Type.BROKER, ""), Nil$.MODULE$);
        Uuid verifyDestinationClusterLinkQuota = verifyDestinationClusterLinkQuota(colonVar, ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND);
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, None$.MODULE$);
        ClusterLinkTestHarness destCluster = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster.createConfluentAdminClient(destCluster.createConfluentAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.fetch.response.total.bytes", "10000"), AlterConfigOp.OpType.SET);
        AlterConfigOp alterConfigOp2 = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.insync.fetch.response.total.bytes", "10000"), AlterConfigOp.OpType.SET);
        createConfluentAdminClient.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) colonVar.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp, alterConfigOp2}))).asJavaCollection());
        })).toMap($less$colon$less$.MODULE$.refl())).asJava()).all().get();
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, new Some(new FetchResponseSize(5000, 10000)));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationClusterLinkQuotaWithBrokerRestart(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1());
        createConfluentAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), "100"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        createConfluentAdminClient.close();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Tuple2<Object, Object> shutdownLeader = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp = shutdownLeader._1$mcI$sp();
        Tuple2<Object, Object> waitForLeaderChange = destCluster().waitForLeaderChange(topicPartition, _1$mcI$sp, shutdownLeader._2$mcI$sp());
        if (waitForLeaderChange == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp2 = waitForLeaderChange._1$mcI$sp();
        int _2$mcI$sp = waitForLeaderChange._2$mcI$sp();
        destCluster().startBroker(_1$mcI$sp);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Set $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1 = $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(this, topicPartition);
            if ($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1.contains(BoxesRunTime.boxToInteger(_1$mcI$sp))) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertTrue(tuple2._2$mcZ$sp(), new StringBuilder(42).append("Broker ").append(_1$mcI$sp).append(" is not part of ISR ").append((Set) tuple2._1()).append(" for partition ").append(topicPartition).toString());
        destCluster().updateBootstrapServers();
        ClusterLinkTestHarness destCluster3 = destCluster();
        ConfluentAdmin createConfluentAdminClient2 = destCluster3.createConfluentAdminClient(destCluster3.createConfluentAdminClient$default$1());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            try {
                try {
                    createConfluentAdminClient2.electLeaders(ElectionType.PREFERRED, Collections.singleton(topicPartition)).all().get(15L, TimeUnit.SECONDS);
                    break;
                } catch (Throwable unused) {
                    Assertions.fail("Preferred leader election failed");
                    break;
                }
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis2 > 15000) {
                    throw e;
                }
                if (testUtils$4.logger().underlying().isInfoEnabled()) {
                    testUtils$4.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$4, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
        Assertions.assertEquals(destCluster().waitForLeaderChange(topicPartition, _1$mcI$sp2, _2$mcI$sp)._1$mcI$sp(), _1$mcI$sp, "Preferred leader not elected");
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        produceUntil(sourceCluster2.createProducer(sourceCluster2.createProducer$default$1(), sourceCluster2.createProducer$default$2(), sourceCluster2.createProducer$default$3()), () -> {
            return this.destClusterLinkReplicasThrottled();
        }, "Destination quota not applied after broker restart");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testDestinationLagLinkFetcherThrottle(String str, boolean z) {
        numPartitions_$eq(2);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-2")})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-1")})), destCluster3.alterClusterLink$default$3(), destCluster3.alterClusterLink$default$4(), destCluster3.alterClusterLink$default$5());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "10485760")})), destCluster4.alterClusterLink$default$3(), destCluster4.alterClusterLink$default$4(), destCluster4.alterClusterLink$default$5());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) == 0.0d);
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "0")})), destCluster5.alterClusterLink$default$3(), destCluster5.alterClusterLink$default$4(), destCluster5.alterClusterLink$default$5());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()) > 0.0d);
        Assertions.assertEquals(2.0d, totalKafkaMetricValue(destCluster().aliveServers(), "link-fetcher-count", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4(), totalKafkaMetricValue$default$5()));
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSourceClusterQuota(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("replica.fetch.max.bytes"), "100")}))), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        ConfluentAdmin createConfluentAdminClient = sourceCluster2.createConfluentAdminClient(sourceCluster2.createConfluentAdminClient$default$1());
        verifyQuota(j -> {
            this.setQuota$1(j, createConfluentAdminClient);
        }, () -> {
            return this.throttled$1();
        }, false);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
    }

    public Uuid verifyDestinationClusterLinkQuota(Seq<ConfigResource> seq, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        ConfluentAdmin createConfluentAdminClient = destCluster2.createConfluentAdminClient(destCluster2.createConfluentAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET);
        createConfluentAdminClient.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        })).toMap($less$colon$less$.MODULE$.refl())).asJava()).all().get();
        verifyQuota(j -> {
            setQuota$2(j, seq, createConfluentAdminClient);
        }, () -> {
            return this.destClusterLinkReplicasThrottled();
        }, true);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyQuotaMode(clusterLinkQuotaMode);
        return createClusterLink;
    }

    public void verifyQuotaMode(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createConfluentAdminClient(sourceCluster.createConfluentAdminClient$default$1()).incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("producer_byte_rate", "100000"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        KafkaBroker partitionLeader = sourceCluster().partitionLeader(new TopicPartition(topic(), 0));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyQuotaMode$1(partitionLeader, clusterLinkQuotaMode)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Quota mode not updated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        produceToSourceCluster(20);
        verifyClusterLinkQuotaMetrics(new $colon.colon(partitionLeader, Nil$.MODULE$), clusterLinkQuotaMode.equals(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean destClusterLinkReplicasThrottled() {
        return yammerMetricMaxValue("kafka.server:type=ReplicaManager,name=ThrottledClusterLinkReplicasPerSec", None$.MODULE$) > 0.0d;
    }

    private void verifyFetchResponseSize(Uuid uuid, Option<FetchResponseSize> option) {
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) ((IterableOps) ((IterableOps) destCluster().brokers().map(kafkaBroker -> {
            return (ClusterLinkFetcherManager) kafkaBroker.clusterLinkManager().fetcherManager(uuid).get();
        })).filter(clusterLinkFetcherManager2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyFetchResponseSize$2(clusterLinkFetcherManager2));
        })).head();
        LeaderEndPoint leader = ((ClusterLinkFetcher) ((HashMap) TestUtils.fieldValue(clusterLinkFetcherManager, AbstractFetcherManager.class, "fetcherThreadMap")).values().head()).leader();
        Object orElse = option.map(fetchResponseSize -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize.perPartitionSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchMaxBytes();
        });
        Object orElse2 = option.map(fetchResponseSize2 -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize2.responseSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchResponseMaxBytes();
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int unboxToInt = BoxesRunTime.unboxToInt(TestUtils.fieldValue(leader, RemoteLeaderEndPoint.class, "fetchSize"));
            Integer boxToInteger = BoxesRunTime.boxToInteger(unboxToInt);
            if ($anonfun$verifyFetchResponseSize$8(orElse, unboxToInt)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(orElse, BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(TestUtils.fieldValue(leader, RemoteLeaderEndPoint.class, "fetchSize"))));
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            int fetchResponseSize$1 = fetchResponseSize$1(leader);
            Integer boxToInteger2 = BoxesRunTime.boxToInteger(fetchResponseSize$1);
            if ($anonfun$verifyFetchResponseSize$10(orElse2, fetchResponseSize$1)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Assertions.assertEquals(orElse2, BoxesRunTime.boxToInteger(fetchResponseSize$1(leader)));
    }

    public static final /* synthetic */ int $anonfun$testDestinationClusterLinkBrokerLevelQuota$1(KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId();
    }

    public static final /* synthetic */ ConfigResource $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(int i) {
        return new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(i));
    }

    public static final /* synthetic */ Set $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(ClusterLinkQuotaIntegrationTest clusterLinkQuotaIntegrationTest, TopicPartition topicPartition) {
        return ((IterableOnceOps) CollectionConverters$.MODULE$.ListHasAsScala(((TopicPartitionInfo) clusterLinkQuotaIntegrationTest.destCluster().describeTopic(clusterLinkQuotaIntegrationTest.topic()).partitions().get(topicPartition.partition())).isr()).asScala().map(node -> {
            return BoxesRunTime.boxToInteger(node.id());
        })).toSet();
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3(int i, Set set) {
        return set.contains(BoxesRunTime.boxToInteger(i));
    }

    public static final /* synthetic */ void $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$4(ObjectRef objectRef, TopicPartition topicPartition) {
        try {
            ((ConfluentAdmin) objectRef.elem).electLeaders(ElectionType.PREFERRED, Collections.singleton(topicPartition)).all().get(15L, TimeUnit.SECONDS);
        } catch (Throwable unused) {
            Assertions.fail("Preferred leader election failed");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void setQuota$1(long j, ConfluentAdmin confluentAdmin) {
        confluentAdmin.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(new ClientQuotaEntity(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), linkUserName(linkName()))}))).asJava()), Collections.singleton(new ClientQuotaAlteration.Op(DynamicConfig$Client$.MODULE$.ConsumerByteRateOverrideProp(), Predef$.MODULE$.double2Double(j)))))).all().get(15L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean throttled$1() {
        return kafkaMetricMaxValue(destCluster().brokers(), "fetch-throttle-time-max", "cluster-link", new Some(linkName()), kafkaMetricMaxValue$default$5(), kafkaMetricMaxValue$default$6(), kafkaMetricMaxValue$default$7(), kafkaMetricMaxValue$default$8()) > 0.0d;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void setQuota$2(long j, Seq seq, ConfluentAdmin confluentAdmin) {
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), Long.toString(j)), AlterConfigOp.OpType.SET);
        confluentAdmin.incrementalAlterConfigs(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.IterableHasAsJava((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        })).toMap($less$colon$less$.MODULE$.refl())).asJava()).all().get();
    }

    public static final /* synthetic */ boolean $anonfun$verifyQuotaMode$1(KafkaBroker kafkaBroker, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode2 = kafkaBroker.config().clusterLinkQuotaMode();
        return clusterLinkQuotaMode2 == null ? clusterLinkQuotaMode == null : clusterLinkQuotaMode2.equals(clusterLinkQuotaMode);
    }

    public static final /* synthetic */ String $anonfun$verifyQuotaMode$2() {
        return "Quota mode not updated";
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$2(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return clusterLinkFetcherManager.fetcherCount() > 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchSize$1(LeaderEndPoint leaderEndPoint) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue(leaderEndPoint, RemoteLeaderEndPoint.class, "fetchSize"));
    }

    private static final ClusterLinkLeaderRequestBuilder fetcherThreadLeaderRequestBuilder$1(LeaderEndPoint leaderEndPoint) {
        return (ClusterLinkLeaderRequestBuilder) TestUtils.fieldValue(leaderEndPoint, ClusterLinkLeaderEndPoint.class, "requestBuilder");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchResponseSize$1(LeaderEndPoint leaderEndPoint) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue((ClusterLinkLeaderRequestBuilder) TestUtils.fieldValue(leaderEndPoint, ClusterLinkLeaderEndPoint.class, "requestBuilder"), ClusterLinkLeaderRequestBuilder.class, "fetchResponseSize"));
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$8(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$10(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }
}
