package kafka.link;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import kafka.server.FetchConnectionsMode$Combined$;
import kafka.server.KafkaBroker;
import kafka.server.link.ActiveTaskState$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ClusterLinkPauseMirrorTopicsTaskType$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.StringOps$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

/* compiled from: ClusterLinkControlPlaneLinkConfigResourceIntegrationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:3")})
@ScalaSignature(bytes = "\u0006\u0005\u0005ea\u0001\u0002\u0007\u000e\u0001IAQa\u0006\u0001\u0005\u0002aAqA\u0007\u0001C\u0002\u0013\u00051\u0004\u0003\u0004%\u0001\u0001\u0006I\u0001\b\u0005\bK\u0001\u0011\r\u0011\"\u0001\u001c\u0011\u00191\u0003\u0001)A\u00059!)q\u0005\u0001C\u0001Q!)1\f\u0001C\u00019\")!\r\u0001C\u0001G\")\u0011\u000e\u0001C\u0001U\")!\u000f\u0001C\u0001g\")\u0011\u0010\u0001C\u0001u\nA4\t\\;ti\u0016\u0014H*\u001b8l\u0007>tGO]8m!2\fg.\u001a'j].\u001cuN\u001c4jOJ+7o\\;sG\u0016Le\u000e^3he\u0006$\u0018n\u001c8UKN$(B\u0001\b\u0010\u0003\u0011a\u0017N\\6\u000b\u0003A\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001'A\u0011A#F\u0007\u0002\u001b%\u0011a#\u0004\u0002#\u0003\n\u001cHO]1di\u000ecWo\u001d;fe2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005I\u0002C\u0001\u000b\u0001\u00035\u0019wN\\:v[\u0016\u0014xI]8vaV\tA\u0004\u0005\u0002\u001eE5\taD\u0003\u0002 A\u0005!A.\u00198h\u0015\u0005\t\u0013\u0001\u00026bm\u0006L!a\t\u0010\u0003\rM#(/\u001b8h\u00039\u0019wN\\:v[\u0016\u0014xI]8va\u0002\n\u0001&\u001a=qK\u000e$X\r\u001a)bkN,W*\u001b:s_J$v\u000e]5dg\u000e{WO\u001c;NKR\u0014\u0018n\u0019(b[\u0016\f\u0011&\u001a=qK\u000e$X\r\u001a)bkN,W*\u001b:s_J$v\u000e]5dg\u000e{WO\u001c;NKR\u0014\u0018n\u0019(b[\u0016\u0004\u0013\u0001\u0006;fgR\u0004\u0016-^:f\u00072,8\u000f^3s\u0019&t7\u000eF\u0002*_m\u0002\"AK\u0017\u000e\u0003-R\u0011\u0001L\u0001\u0006g\u000e\fG.Y\u0005\u0003]-\u0012A!\u00168ji\")\u0001G\u0002a\u0001c\u00051\u0011/^8sk6\u0004\"AM\u001d\u000f\u0005M:\u0004C\u0001\u001b,\u001b\u0005)$B\u0001\u001c\u0012\u0003\u0019a$o\\8u}%\u0011\u0001hK\u0001\u0007!J,G-\u001a4\n\u0005\rR$B\u0001\u001d,\u0011\u0015ad\u00011\u0001>\u0003-\u0019wn\u001c:eS:\fGo\u001c:\u0011\u0005)r\u0014BA ,\u0005\u001d\u0011un\u001c7fC:DCAB!N\u001dB\u0011!iS\u0007\u0002\u0007*\u0011A)R\u0001\u0007a\u0006\u0014\u0018-\\:\u000b\u0005\u0019;\u0015a\u00026va&$XM\u001d\u0006\u0003\u0011&\u000bQA[;oSRT\u0011AS\u0001\u0004_J<\u0017B\u0001'D\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001P\u0003!ZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018d_>\u0014H-\u001b8bi>\u0014Xh_\u0019~Q\u00111\u0011k\u0016-\u0011\u0005I+V\"A*\u000b\u0005Q\u001b\u0015\u0001\u00039s_ZLG-\u001a:\n\u0005Y\u001b&\u0001D'fi\"|GmU8ve\u000e,\u0017!\u0002<bYV,G&A-\"\u0003i\u000bq\"\u00197m\u0007>l'-\u001b8bi&|gn]\u0001\u0017i\u0016\u001cHo\u00117vgR,'\u000fT5oW\u000e{gNZ5hgR\u0019\u0011&\u00180\t\u000bA:\u0001\u0019A\u0019\t\u000bq:\u0001\u0019A\u001f)\t\u001d\tUJ\u0014\u0015\u0005\u000fE;\u0016\rL\u0001Z\u0003m!Xm\u001d;BYR,'o\u00117vgR,'\u000fT5oW\u000e{gNZ5hgR\u0019\u0011\u0006Z3\t\u000bAB\u0001\u0019A\u0019\t\u000bqB\u0001\u0019A\u001f)\t!\tUJ\u0014\u0015\u0005\u0011E;\u0006\u000eL\u0001Z\u0003\u0019\"Xm\u001d;BYR,'o\u00117vgR,'\u000fT5oW^KG\u000f[%om\u0006d\u0017\u000eZ\"p]\u001aLwm\u001d\u000b\u0004S-d\u0007\"\u0002\u0019\n\u0001\u0004\t\u0004\"\u0002\u001f\n\u0001\u0004i\u0004\u0006B\u0005B\u001b:CC!C)X_2\n\u0001/I\u0001r\u00039Q8nQ8nE&t\u0017\r^5p]N\fa\u0004^3ti\u0016CH/\u001a:oC2L'0\u001a3QCN\u001cxo\u001c:e\u0007>tg-[4\u0015\u0007%\"X\u000fC\u00031\u0015\u0001\u0007\u0011\u0007C\u0003=\u0015\u0001\u0007Q\b\u000b\u0003\u000b\u00036s\u0005\u0006\u0002\u0006R/bd\u0013!W\u0001%i\u0016\u001cHo\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006$v\u000e]5d\u0007J,\u0017\r^5p]R\u0019\u0011f\u001f?\t\u000bAZ\u0001\u0019A\u0019\t\u000bqZ\u0001\u0019A\u001f)\t-\tUJ\u0014\u0015\u0005\u0017E;v\u0010L\u0001ZQ\u0019\u0001\u00111A,\u0002\u0010A!\u0011QAA\u0006\u001b\t\t9AC\u0002\u0002\n\u0015\u000b1!\u00199j\u0013\u0011\ti!a\u0002\u0003\u0007Q\u000bw-\t\u0002\u0002\u0012\u0005Y\u0011N\u001c;fOJ\fG/[8oQ\u0019\u0001\u00111A,\u0002\u0016\u0005\u0012\u0011qC\u0001\u0014E\u0006TX\r\u001c\u001etQ\u0006\u0014HmX2pk:$(h\r")
/* loaded from: input_file:kafka/link/ClusterLinkControlPlaneLinkConfigResourceIntegrationTest.class */
public class ClusterLinkControlPlaneLinkConfigResourceIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private final String consumerGroup = "testGroup";
    private final String expectedPauseMirrorTopicsCountMetricName = "pause-mirror-topics";

    public String consumerGroup() {
        return this.consumerGroup;
    }

    public String expectedPauseMirrorTopicsCountMetricName() {
        return this.expectedPauseMirrorTopicsCountMetricName;
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testPauseClusterLink(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        int numPartitions = numPartitions();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions, replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "10000")})));
        Map<String, String> map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp()), "100")}));
        Properties destLinkProps = destLinkProps(map);
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, sourceLinkProps(map), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        commitOffsets(sourceCluster(), topic(), 0, 10, consumerGroup());
        verifyOffsetMigration(topic(), 0, 10, consumerGroup(), verifyOffsetMigration$default$5());
        verifyConsumerOffsetMigrationMetrics();
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkPauseMirrorTopicsTaskType$.MODULE$, linkManager, str2);
        }, new Some(expectedPauseMirrorTopicsCountMetricName()));
        waitForRemoteLinkStateOnActiveLink(linkName());
        JFunction1.mcZD.sp spVar = d -> {
            return d > 0.0d;
        };
        MetricName activeClientConnectionsCountMetricName = ClusterLinkMetrics$.MODULE$.activeClientConnectionsCountMetricName();
        waitForKafkaMetric(activeClientConnectionsCountMetricName.name(), activeClientConnectionsCountMetricName.group(), None$.MODULE$, waitForKafkaMetric$default$4(), waitForKafkaMetric$default$5(), waitForKafkaMetric$default$6(), waitForKafkaMetric$default$7(), spVar);
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map2, destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        if (useSourceInitiatedLink()) {
            sourceCluster().waitForRemoteLinkState(linkName(), ClusterLinkDescription.LinkState.PAUSED, ClusterLinkError.NO_ERROR);
            ClusterLinkTestHarness sourceCluster2 = sourceCluster();
            sourceCluster2.alterClusterLink(linkName(), map2, sourceCluster2.alterClusterLink$default$3(), sourceCluster2.alterClusterLink$default$4(), sourceCluster2.alterClusterLink$default$5());
        }
        JFunction1.mcZD.sp spVar2 = d2 -> {
            return d2 == 0.0d;
        };
        MetricName activeClientConnectionsCountMetricName2 = ClusterLinkMetrics$.MODULE$.activeClientConnectionsCountMetricName();
        waitForKafkaMetric(activeClientConnectionsCountMetricName2.name(), activeClientConnectionsCountMetricName2.group(), None$.MODULE$, waitForKafkaMetric$default$4(), waitForKafkaMetric$default$5(), waitForKafkaMetric$default$6(), waitForKafkaMetric$default$7(), spVar2);
        waitForClusterLinkManagerEmpty(destCluster());
        int i = numPartitions + 2;
        sourceCluster().createPartitions(topic(), i);
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "20000")})));
        produceToSourceCluster(8);
        commitOffsets(sourceCluster(), topic(), 0, 20, consumerGroup());
        verifyPausedLinkMetrics();
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), numPartitions());
        verifyDescribeLinksResult(ClusterLinkDescription.LinkState.PAUSED, ClusterLinkDescription.LinkState.PAUSED);
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkPauseMirrorTopicsTaskType$.MODULE$, linkManager2, str3);
        }, new Some(expectedPauseMirrorTopicsCountMetricName()));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.pauseTopic(topic(), destCluster3.pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), numPartitions());
        destCluster().pauseTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, topic(), numPartitions());
        Assertions.assertThrows(ClusterLinkPausedException.class, () -> {
            ClusterLinkTestHarness destCluster4 = this.destCluster();
            destCluster4.linkTopic("paused-topic", this.replicationFactor(), this.linkName(), destCluster4.linkTopic$default$4(), destCluster4.linkTopic$default$5());
        });
        Thread.sleep(250L);
        Assertions.assertEquals(numPartitions, destCluster().describeTopic(topic()).partitions().size());
        Assertions.assertEquals("10000", destCluster().describeTopicConfig(topic()).get("delete.retention.ms").value());
        Assertions.assertEquals(10, destCluster().getOffset(topic(), 0, consumerGroup()));
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        destCluster().waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, topic(), numPartitions());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testPauseClusterLink$6 = $anonfun$testPauseClusterLink$6(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testPauseClusterLink$6);
            if ($anonfun$testPauseClusterLink$7(i, $anonfun$testPauseClusterLink$6)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(i, tuple2._1$mcI$sp());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long computeUntilTrue$default$22 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$32 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            String $anonfun$testPauseClusterLink$8 = $anonfun$testPauseClusterLink$8(this);
            if ($anonfun$testPauseClusterLink$9("20000", $anonfun$testPauseClusterLink$8)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$8), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + computeUntilTrue$default$22) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$8), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$22), computeUntilTrue$default$32));
            }
        }
        Tuple2 tuple22 = $minus$greater$extension2;
        if (tuple22 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals("20000", (String) tuple22._1());
        verifyOffsetMigration(topic(), 0, 20, consumerGroup(), verifyOffsetMigration$default$5());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.unlinkTopic(topic(), linkName(), destCluster4.unlinkTopic$default$3(), destCluster4.unlinkTopic$default$4(), destCluster4.unlinkTopic$default$5(), destCluster4.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.deleteClusterLink(linkName(), destCluster5.deleteClusterLink$default$2(), destCluster5.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkConfigs(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(8);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("confluent.replica.fetch.connections.mode"), FetchConnectionsMode$Combined$.MODULE$.value())}))), sourceLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness connectingCluster = super.connectingCluster();
        Config describeClusterLink = connectingCluster.describeClusterLink(linkName());
        Assertions.assertFalse(describeClusterLink.get("metadata.max.age.ms").isDefault());
        Assertions.assertFalse(describeClusterLink.get("bootstrap.servers").isDefault());
        Assertions.assertTrue(describeClusterLink.get(ClusterLinkConfig$.MODULE$.AclSyncMsProp()).isDefault());
        connectingCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncMsProp()), "5000")})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        Assertions.assertFalse(connectingCluster.describeClusterLink(linkName()).get(ClusterLinkConfig$.MODULE$.AclSyncMsProp()).isDefault());
        connectingCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncMsProp()), "5001")})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        Assertions.assertFalse(connectingCluster.describeClusterLink(linkName()).get(ClusterLinkConfig$.MODULE$.AclSyncMsProp()).isDefault());
        ClusterLinkTestHarness sourceCluster2 = connectingCluster.equals(destCluster()) ? sourceCluster() : destCluster();
        connectingCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "60000")})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertEquals("60000", connectingCluster.describeClusterLink(linkName()).get("metadata.max.age.ms").value());
        sourceCluster2.killAllBrokers();
        sourceCluster2.startAllBrokers();
        connectingCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), sourceCluster2.bootstrapServers(sourceCluster2.bootstrapServers$default$1()))})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            ClusterLinkTestHarness destCluster2 = this.destCluster();
            destCluster2.alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkModeProp()), "SOURCE")})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        });
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            connectingCluster.alterClusterLink(this.linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConnectionModeProp()), "INBOUND")})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        });
        Assertions.assertEquals(destinationLinkMode(), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().linkMode());
        Assertions.assertEquals(ConnectionMode$Outbound$.MODULE$, ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) connectingCluster.brokers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().connectionMode());
        String sb = useSourceInitiatedLink() ? new StringBuilder(23).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append("ssl.truststore.location").toString() : "ssl.truststore.location";
        File file = new File(connectingCluster.describeClusterLink(linkName()).get(sb).value());
        File createTempFile = File.createTempFile("truststore", ".jks");
        Files.copy(file.toPath(), createTempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        connectingCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), createTempFile.getAbsolutePath())})), connectingCluster.alterClusterLink$default$3(), connectingCluster.alterClusterLink$default$4(), connectingCluster.alterClusterLink$default$5());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        int brokerId = ((KafkaBroker) destCluster().brokers().head()).config().brokerId();
        destCluster().alterPartitionAssignment(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), Optional.of(new NewPartitionReassignment(Collections.singletonList(Predef$.MODULE$.int2Integer(brokerId)))));
        })).toMap($less$colon$less$.MODULE$.refl())).asJava());
        destCluster().ensureConsistentKRaftMetadata();
        int brokerId2 = ((KafkaBroker) sourceCluster().brokers().head()).config().brokerId();
        sourceCluster().alterPartitionAssignment(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) partitions(partitions$default$1(), partitions$default$2(), partitions$default$3()).map(topicPartition2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), Optional.of(new NewPartitionReassignment(Collections.singletonList(Predef$.MODULE$.int2Integer(brokerId2)))));
        })).toMap($less$colon$less$.MODULE$.refl())).asJava());
        sourceCluster().ensureConsistentKRaftMetadata();
        Assertions.assertEquals(1, maxFetcherCount(createClusterLink));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), "3")})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int maxFetcherCount = maxFetcherCount(createClusterLink);
            Integer boxToInteger = BoxesRunTime.boxToInteger(maxFetcherCount);
            if ($anonfun$testClusterLinkConfigs$6(maxFetcherCount)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(3, tuple2._1$mcI$sp());
        produceToSourceCluster(8);
        consume(destCluster(), consume$default$2());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAlterClusterLinkConfigs(String str, boolean z) {
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), true, createClusterLink$default$5());
        ConfigEntry configEntry = destCluster().describeClusterLink(linkName()).get(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp());
        Assertions.assertNotNull(configEntry);
        Assertions.assertTrue(configEntry.isDefault());
        Assertions.assertNotNull(configEntry.value());
        String value = configEntry.value();
        String num = Integer.toString(StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(value)) + 1000);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp()), num)})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        ConfigEntry configEntry2 = destCluster().describeClusterLink(linkName()).get(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp());
        Assertions.assertNotNull(configEntry2);
        Assertions.assertFalse(configEntry2.isDefault());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.empty(), destCluster2.alterClusterLink$default$3(), (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp()})), destCluster2.alterClusterLink$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            ConfigEntry configEntry3 = destCluster().describeClusterLink(linkName()).get(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp());
            Assertions.assertNotNull(configEntry3);
            String value2 = configEntry3.value();
            if (value != null ? value.equals(value2) : value2 == null) {
                Assertions.assertTrue(configEntry3.isDefault());
                ClusterLinkTestHarness destCluster3 = destCluster();
                destCluster3.deleteClusterLink(linkName(), destCluster3.deleteClusterLink$default$2(), destCluster3.deleteClusterLink$default$3());
                return;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    Assertions.fail($anonfun$testAlterClusterLinkConfigs$2(this));
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
    }

    @MethodSource({"zkCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAlterClusterLinkWithInvalidConfigs(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true")})));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        destCluster().updateZkLinkConfig(createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), true, createClusterLink$default$5()), ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), "{");
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), includeAllTopicsFilter().replace("topic", "group"))})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(100);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testExternalizedPasswordConfig(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        destLinkProps.put("sasl.jaas.config", new StringBuilder(25).append("${file:").append(TestUtils.tempFile(new StringBuilder(17).append("sasl.jaas.config=").append(destLinkProps.getProperty("sasl.jaas.config")).toString()).getAbsolutePath()).append(":sasl.jaas.config}").toString());
        destLinkProps.setProperty("config.providers", "file");
        destLinkProps.setProperty("config.providers.file.class", FileConfigProvider.class.getName());
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifySaslJaasConfigEncrypted(createClusterLink);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp()), "1000")})));
        verifySaslJaasConfigEncrypted(createClusterLink);
        produceToSourceCluster(20);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
        destLinkProps.setProperty("sasl.jaas.config", "someprovider:link.props:sasl.jaas.config");
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink("invalidLink1", destLinkProps, this.createClusterLink$default$3(), true, this.createClusterLink$default$5());
        });
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink("invalidLink2", destLinkProps, this.createClusterLink$default$3(), false, this.createClusterLink$default$5());
        });
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkMetadataTopicCreation(String str, boolean z) {
        destCluster().killAllBrokers();
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.enable", "true");
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.partitions", "2");
        destCluster().serverConfig().setProperty("confluent.cluster.link.metadata.topic.replication.factor", "1");
        destCluster().restartDeadBrokers(true);
        destCluster().updateBootstrapServers();
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.enable");
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.partitions");
        destCluster().serverConfig().remove("confluent.cluster.link.metadata.topic.replication.factor");
    }

    private final void waitForConnectionMetric$1(Function1 function1) {
        MetricName activeClientConnectionsCountMetricName = ClusterLinkMetrics$.MODULE$.activeClientConnectionsCountMetricName();
        waitForKafkaMetric(activeClientConnectionsCountMetricName.name(), activeClientConnectionsCountMetricName.group(), None$.MODULE$, waitForKafkaMetric$default$4(), waitForKafkaMetric$default$5(), waitForKafkaMetric$default$6(), waitForKafkaMetric$default$7(), function1);
    }

    public static final /* synthetic */ int $anonfun$testPauseClusterLink$6(ClusterLinkControlPlaneLinkConfigResourceIntegrationTest clusterLinkControlPlaneLinkConfigResourceIntegrationTest) {
        return clusterLinkControlPlaneLinkConfigResourceIntegrationTest.destCluster().describeTopic(clusterLinkControlPlaneLinkConfigResourceIntegrationTest.topic()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$7(int i, int i2) {
        return i2 == i;
    }

    public static final /* synthetic */ String $anonfun$testPauseClusterLink$8(ClusterLinkControlPlaneLinkConfigResourceIntegrationTest clusterLinkControlPlaneLinkConfigResourceIntegrationTest) {
        return clusterLinkControlPlaneLinkConfigResourceIntegrationTest.destCluster().describeTopicConfig(clusterLinkControlPlaneLinkConfigResourceIntegrationTest.topic()).get("delete.retention.ms").value();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$9(String str, String str2) {
        return str2 == null ? str == null : str2.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigs$6(int i) {
        return i == 3;
    }

    public static final /* synthetic */ boolean $anonfun$testAlterClusterLinkConfigs$1(ClusterLinkControlPlaneLinkConfigResourceIntegrationTest clusterLinkControlPlaneLinkConfigResourceIntegrationTest, ObjectRef objectRef, ObjectRef objectRef2, String str) {
        objectRef.elem = clusterLinkControlPlaneLinkConfigResourceIntegrationTest.destCluster().describeClusterLink(clusterLinkControlPlaneLinkConfigResourceIntegrationTest.linkName());
        objectRef2.elem = ((Config) objectRef.elem).get(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp());
        Assertions.assertNotNull((ConfigEntry) objectRef2.elem);
        String value = ((ConfigEntry) objectRef2.elem).value();
        return str == null ? value == null : str.equals(value);
    }

    public static final /* synthetic */ String $anonfun$testAlterClusterLinkConfigs$2(ClusterLinkControlPlaneLinkConfigResourceIntegrationTest clusterLinkControlPlaneLinkConfigResourceIntegrationTest) {
        return new StringBuilder(29).append("Link configs not deleted for ").append(clusterLinkControlPlaneLinkConfigResourceIntegrationTest.linkName()).toString();
    }
}
