package kafka.link;

import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.TopicLinkMirror$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.opentest4j.AssertionFailedError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq$;
import scala.collection.StringOps$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkMetricsIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u001d4AAB\u0004\u0001\u0019!)\u0011\u0003\u0001C\u0001%!)A\u0003\u0001C\u0001+!)\u0011\n\u0001C\u0001\u0015\")\u0001\u000b\u0001C\u0001#\")q\u000b\u0001C\u00011\n\t3\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$(/[2t\u0013:$Xm\u001a:bi&|g\u000eV3ti*\u0011\u0001\"C\u0001\u0005Y&t7NC\u0001\u000b\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0007\u0011\u00059yQ\"A\u0004\n\u0005A9!AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002'A\u0011a\u0002A\u0001%i\u0016\u001cH/T5se>\u0014Hk\u001c9jG\u000e{WO\u001c;G_JlU\u000f\u001c;ja2,G*\u001b8lgR\u0019a\u0003H\u0015\u0011\u0005]QR\"\u0001\r\u000b\u0003e\tQa]2bY\u0006L!a\u0007\r\u0003\tUs\u0017\u000e\u001e\u0005\u0006;\t\u0001\rAH\u0001\u0007cV|'/^7\u0011\u0005}1cB\u0001\u0011%!\t\t\u0003$D\u0001#\u0015\t\u00193\"\u0001\u0004=e>|GOP\u0005\u0003Ka\ta\u0001\u0015:fI\u00164\u0017BA\u0014)\u0005\u0019\u0019FO]5oO*\u0011Q\u0005\u0007\u0005\u0006U\t\u0001\raK\u0001\fG>|'\u000fZ5oCR|'\u000f\u0005\u0002\u0018Y%\u0011Q\u0006\u0007\u0002\b\u0005>|G.Z1oQ\u0011\u0011qf\u000f\u001f\u0011\u0005AJT\"A\u0019\u000b\u0005I\u001a\u0014A\u00029be\u0006l7O\u0003\u00025k\u00059!.\u001e9ji\u0016\u0014(B\u0001\u001c8\u0003\u0015QWO\\5u\u0015\u0005A\u0014aA8sO&\u0011!(\r\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013!P\u0001)w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBjhfY8pe\u0012Lg.\u0019;pevZ\u0018' \u0015\u0005\u0005}*e\t\u0005\u0002A\u00076\t\u0011I\u0003\u0002Cc\u0005A\u0001O]8wS\u0012,'/\u0003\u0002E\u0003\naQ*\u001a;i_\u0012\u001cv.\u001e:dK\u0006)a/\u00197vK2\nq)I\u0001I\u0003=\tG\u000e\\\"p[\nLg.\u0019;j_:\u001c\u0018A\t;fgR\u0014%o\\6feR{\u0007/[2Ti\u0006$8OR8s\u001b&\u0014(o\u001c:U_BL7\rF\u0002\u0017\u00172CQ!H\u0002A\u0002yAQAK\u0002A\u0002-BCaA\u0018<y!\"1aP#PY\u00059\u0015!\u0004;fgR\u0014\u0006o\\'fiJL7\rF\u0002\u0017%NCQ!\b\u0003A\u0002yAQA\u000b\u0003A\u0002-BC\u0001B\u0018<y!\"AaP#WY\u00059\u0015!\u0006;fgR\u001cuN\u001c8fGRLwN\\'fiJL7m\u001d\u000b\u0004-eS\u0006\"B\u000f\u0006\u0001\u0004q\u0002\"\u0002\u0016\u0006\u0001\u0004Y\u0003\u0006B\u00030wqBC!B F;2\nq\t\u000b\u0003\u0001?\u0016+\u0007C\u00011d\u001b\u0005\t'B\u000124\u0003\r\t\u0007/[\u0005\u0003I\u0006\u00141\u0001V1hC\u00051\u0017aC5oi\u0016<'/\u0019;j_:\u0004")
/* loaded from: input_file:kafka/link/ClusterLinkMetricsIntegrationTest.class */
public class ClusterLinkMetricsIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorTopicCountForMultipleLinks(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        String sb = new StringBuilder(3).append(linkName()).append("Two").toString();
        String sb2 = new StringBuilder(3).append(topic()).append("Two").toString();
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic(sb2, numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        createClusterLink(sb, createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(sb2, replicationFactor(), sb, destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        verifyMirrorTopicCount("mirror-topic-count", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), TopicLinkMirror$.MODULE$.name())})), 1, linkName(), verifyMirrorTopicCount$default$5());
        verifyMirrorTopicCount("mirror-topic-count", (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), TopicLinkMirror$.MODULE$.name())})), 1, sb, verifyMirrorTopicCount$default$5());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBrokerTopicStatsForMirrorTopic(String str, boolean z) {
        String sb;
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), true, createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            sb = new StringBuilder(26).append("MirrorBytesInPerSec").append(",topic=").append(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString()).toString();
        } else {
            sb = new StringBuilder(26).append("MirrorBytesInPerSec").append(",topic=").append(topic()).toString();
        }
        String str2 = sb;
        produceToSourceCluster(1000);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testBrokerTopicStatsForMirrorTopic$1(str2)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail($anonfun$testBrokerTopicStatsForMirrorTopic$2(str2));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        long meterCount = TestUtils$.MODULE$.meterCount(str2);
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testBrokerTopicStatsForMirrorTopic$3("MirrorBytesInPerSec")) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail($anonfun$testBrokerTopicStatsForMirrorTopic$4("MirrorBytesInPerSec"));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        long meterCount2 = TestUtils$.MODULE$.meterCount("MirrorBytesInPerSec");
        produceToSourceCluster(2000);
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testBrokerTopicStatsForMirrorTopic$5(str2, meterCount, "MirrorBytesInPerSec", meterCount2);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis3 > 15000) {
                    throw e;
                }
                if (testUtils$7.logger().underlying().isInfoEnabled()) {
                    testUtils$7.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$7, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testRpoMetric(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), 1, replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("replica.fetch.max.bytes"), "100"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("replica.fetch.wait.max.ms"), "60000")}))), createClusterLink$default$3(), true, createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(1000);
        verifyRpoMetric(linkName(), topic());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(topic(), linkName(), destCluster2.unlinkTopic$default$3(), false, destCluster2.unlinkTopic$default$5(), 1);
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testConnectionMetrics(String str, boolean z) {
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        verifyNoClusterLinkConnections(destCluster(), "paused");
        verifyNoClusterLinkConnections(sourceCluster(), "paused");
        sourceCluster().alterPartitionAssignmentAndWait(topic(), assignment$1(sourceCluster()));
        destCluster().alterPartitionAssignmentAndWait(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), assignment$1(destCluster()));
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), true);
        verifyClusterLinkConnectionMetrics(verifyClusterLinkConnectionMetrics$default$1(), verifyClusterLinkConnectionMetrics$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4(), destCluster2.unlinkTopic$default$5(), numPartitions());
        consume(destCluster(), consume$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
        if (useSourceInitiatedLink()) {
            ClusterLinkTestHarness sourceCluster2 = sourceCluster();
            sourceCluster2.deleteClusterLink(linkName(), sourceCluster2.deleteClusterLink$default$2(), sourceCluster2.deleteClusterLink$default$3());
        }
        verifyNoClusterLinkConnections(destCluster(), "deleted");
        verifyNoClusterLinkConnections(sourceCluster(), "deleted");
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerTopicStatsForMirrorTopic$1(String str) {
        try {
            TestUtils$.MODULE$.meterCount(str);
            return true;
        } catch (AssertionFailedError unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testBrokerTopicStatsForMirrorTopic$2(String str) {
        return new StringBuilder(22).append("Failed to find metric ").append(str).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testBrokerTopicStatsForMirrorTopic$3(String str) {
        try {
            TestUtils$.MODULE$.meterCount(str);
            return true;
        } catch (AssertionFailedError unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testBrokerTopicStatsForMirrorTopic$4(String str) {
        return new StringBuilder(22).append("Failed to find metric ").append(str).toString();
    }

    public static final /* synthetic */ void $anonfun$testBrokerTopicStatsForMirrorTopic$5(String str, long j, String str2, long j2) {
        Assertions.assertTrue(TestUtils$.MODULE$.meterCount(str) > j);
        Assertions.assertTrue(TestUtils$.MODULE$.meterCount(str2) > j2);
    }

    public static final /* synthetic */ boolean $anonfun$testConnectionMetrics$1(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        return kafkaBroker2 == kafkaBroker;
    }

    private final Map assignment$1(ClusterLinkTestHarness clusterLinkTestHarness) {
        KafkaBroker linkCoordinator = clusterLinkTestHarness.linkCoordinator(linkName());
        return (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapIntArray(new int[]{linkCoordinator.config().brokerId(), ((KafkaBroker) ((IterableOps) clusterLinkTestHarness.brokers().filterNot(kafkaBroker -> {
            return BoxesRunTime.boxToBoolean($anonfun$testConnectionMetrics$1(linkCoordinator, kafkaBroker));
        })).head()).config().brokerId()})))}));
    }
}
