package kafka.link;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.log.AbstractLog;
import kafka.server.KafkaBroker;
import kafka.server.link.ActiveTaskState$;
import kafka.server.link.AuthenticationTaskErrorCode$;
import kafka.server.link.ClusterLinkCheckAvailabilityTaskType$;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkSyncOffsetsTaskType$;
import kafka.server.link.ClusterLinkSyncTopicConfigsTaskType$;
import kafka.server.link.ConsumerGroupInUseTaskErrorCode$;
import kafka.server.link.InErrorTaskState$;
import kafka.server.link.LinkCoordinatorNotEnabledTaskErrorCode$;
import kafka.server.link.LinkPausedTaskState$;
import kafka.server.link.LinkUnavailableTaskState$;
import kafka.server.link.MirrorTopicConfigSyncRules$;
import kafka.server.link.MisconfigurationTaskErrorCode$;
import kafka.server.link.NotConfiguredTaskState$;
import kafka.server.link.PeriodicPartitionSchedulerTaskType$;
import kafka.server.link.UnknownTaskState$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkTaskDescription;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkAsyncTaskIntegrationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:11")})
@ScalaSignature(bytes = "\u0006\u0005\u0005Ud\u0001B\n\u0015\u0001eAQA\b\u0001\u0005\u0002}Aq!\t\u0001C\u0002\u0013\u0005!\u0005\u0003\u0004*\u0001\u0001\u0006Ia\t\u0005\bU\u0001\u0011\r\u0011\"\u0001#\u0011\u0019Y\u0003\u0001)A\u0005G!9A\u0006\u0001b\u0001\n\u0003i\u0003B\u0002\u001c\u0001A\u0003%a\u0006C\u00038\u0001\u0011\u0005\u0001\bC\u0003i\u0001\u0011\u0005\u0011\u000eC\u0003p\u0001\u0011\u0005\u0001\u000fC\u0003w\u0001\u0011\u0005q\u000fC\u0003~\u0001\u0011\u0005a\u0010C\u0004\u0002\n\u0001!\t!a\u0003\t\u000f\u0005]\u0001\u0001\"\u0001\u0002\u001a!9\u0011Q\u0005\u0001\u0005\u0002\u0005\u001d\u0002bBA\u001a\u0001\u0011\u0005\u0011Q\u0007\u0005\b\u0003\u0003\u0002A\u0011AA\"\u0011\u001d\ty\u0005\u0001C\u0001\u0003#\u00121e\u00117vgR,'\u000fT5oW\u0006\u001b\u0018P\\2UCN\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\u0016-\u0005!A.\u001b8l\u0015\u00059\u0012!B6bM.\f7\u0001A\n\u0003\u0001i\u0001\"a\u0007\u000f\u000e\u0003QI!!\b\u000b\u0003E\u0005\u00137\u000f\u001e:bGR\u001cE.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0001\u0005\u0005\u0002\u001c\u0001\u0005qqN\u001a4tKR$vnQ8n[&$X#A\u0012\u0011\u0005\u0011:S\"A\u0013\u000b\u0003\u0019\nQa]2bY\u0006L!\u0001K\u0013\u0003\t1{gnZ\u0001\u0010_\u001a47/\u001a;U_\u000e{W.\\5uA\u0005Q1/\u001f8d!\u0016\u0014\u0018n\u001c3\u0002\u0017MLhn\u0019)fe&|G\rI\u0001\u000eG>t7/^7fe\u001e\u0013x.\u001e9\u0016\u00039\u0002\"a\f\u001b\u000e\u0003AR!!\r\u001a\u0002\t1\fgn\u001a\u0006\u0002g\u0005!!.\u0019<b\u0013\t)\u0004G\u0001\u0004TiJLgnZ\u0001\u000fG>t7/^7fe\u001e\u0013x.\u001e9!\u0003)#Xm\u001d;PM\u001a\u001cX\r^'jOJ\fG/[8o)\u0006\u001c8n\u0015;bi\u0016<u.Z:J]R|WI\u001d:pe^CWM\u001c+iKJ,7/\u00118Fq&\u001cH/\u001b8h\u0007>t7/^7fe\u001e\u0013x.\u001e9\u0015\u0007eb\u0004\n\u0005\u0002%u%\u00111(\n\u0002\u0005+:LG\u000fC\u0003>\u0011\u0001\u0007a(\u0001\u0004rk>\u0014X/\u001c\t\u0003\u007f\u0019s!\u0001\u0011#\u0011\u0005\u0005+S\"\u0001\"\u000b\u0005\rC\u0012A\u0002\u001fs_>$h(\u0003\u0002FK\u00051\u0001K]3eK\u001aL!!N$\u000b\u0005\u0015+\u0003\"B%\t\u0001\u0004Q\u0015aC2p_J$\u0017N\\1u_J\u0004\"\u0001J&\n\u00051+#a\u0002\"p_2,\u0017M\u001c\u0015\u0005\u00119S6\f\u0005\u0002P16\t\u0001K\u0003\u0002R%\u00061\u0001/\u0019:b[NT!a\u0015+\u0002\u000f),\b/\u001b;fe*\u0011QKV\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002/\u0006\u0019qN]4\n\u0005e\u0003&!\u0005)be\u0006lW\r^3sSj,G\rV3ti\u0006!a.Y7fC\u0005a\u0016\u0001K>eSN\u0004H.Y=OC6,WPL9v_J,X.P>1{:\u001awn\u001c:eS:\fGo\u001c:>wFj\b\u0006\u0002\u0005_I\u0016\u0004\"a\u00182\u000e\u0003\u0001T!!\u0019)\u0002\u0011A\u0014xN^5eKJL!a\u00191\u0003\u00195+G\u000f[8e'>,(oY3\u0002\u000bY\fG.^3-\u0003\u0019\f\u0013aZ\u0001\u0010C2d7i\\7cS:\fG/[8og\u00061D/Z:u\u001f\u001a47/\u001a;NS\u001e\u0014\u0018\r^5p]R\u000b7o[*uCR,W*\u00198bO\u0016lWM\u001c;WCJLw.^:TG\u0016t\u0017M]5pgR\u0019\u0011H[6\t\u000buJ\u0001\u0019\u0001 \t\u000b%K\u0001\u0019\u0001&)\t%q%l\u0017\u0015\u0005\u0013y#g\u000eL\u0001g\u0003%\"Xm\u001d;PM\u001a\u001cX\r^'jOJ\fG/[8o/&$\b.\u00113eK\u0012\u001cuN\\:v[\u0016\u0014xI]8vaR\u0019\u0011(\u001d:\t\u000buR\u0001\u0019\u0001 \t\u000b%S\u0001\u0019\u0001&)\t)q%l\u0017\u0015\u0005\u0015y#W\u000fL\u0001g\u0003\u0005\"Xm\u001d;PM\u001a\u001cX\r^'jOJ\fG/[8o/&$\b.\u00113eK\u0012$v\u000e]5d)\rI\u00040\u001f\u0005\u0006{-\u0001\rA\u0010\u0005\u0006\u0013.\u0001\rA\u0013\u0015\u0005\u00179S6\f\u000b\u0003\f=\u0012dH&\u00014\u0002=Q,7\u000f^*z]\u000e$v\u000e]5dg\u000e{gNZ5hgR\u000b7o[*uCR,G\u0003B\u001d��\u0003\u0003AQ!\u0010\u0007A\u0002yBQ!\u0013\u0007A\u0002)CC\u0001\u0004([7\"*AB\u00183\u0002\b1\na-\u0001\u0016uKN$H+Y:l'R\fG/\u001a$peZ\u000b'/[8vg\u000ecWo\u001d;fe2Kgn[5oOR\u000b7o[:\u0015\u000be\ni!a\u0004\t\u000buj\u0001\u0019\u0001 \t\u000b%k\u0001\u0019\u0001&)\t5q%l\u0017\u0015\u0006\u001by#\u0017Q\u0003\u0017\u0002M\u0006\u0019B/Z:u)>\u0004\u0018nY\"p]\u001aLwmU=oGR)\u0011(a\u0007\u0002\u001e!)QH\u0004a\u0001}!)\u0011J\u0004a\u0001\u0015\"\"aB\u0014.\\Q\u0015qa\fZA\u0012Y\u00051\u0017!\u0005;fgR\fE\r\u001a)beRLG/[8ogR)\u0011(!\u000b\u0002,!)Qh\u0004a\u0001}!)\u0011j\u0004a\u0001\u0015\"\"qB\u0014.\\Q\u0015ya\fZA\u0019Y\u00051\u0017A\u000b;fgR\fE\r\u001a)beRLG/[8og^KG\u000f[*pkJ\u001cW\rT3bI\u0016\u0014X\t]8dQ\n+X\u000e\u001d\u000b\u0006s\u0005]\u0012\u0011\b\u0005\u0006{A\u0001\rA\u0010\u0005\u0006\u0013B\u0001\rA\u0013\u0015\u0005!9S6\fK\u0003\u0011=\u0012\fy\u0004L\u0001g\u0003\t\"Xm\u001d;J]R,'O^1m\u0007\"\fgnZ3G_J\u0004VM]5pI&\u001cG+Y:lgR)\u0011(!\u0012\u0002H!)Q(\u0005a\u0001}!)\u0011*\u0005a\u0001\u0015\"\"\u0011C\u0014.\\Q\u0015\tb\fZA'Y\u00051\u0017A\u000f;fgR$\u0016m]6EKN\u001c'/\u001b9uS>t7OR8s)^|G*\u001b8lg>sG)\u001b4gKJ,g\u000e\u001e'j].\u001cun\u001c:eS:\fGo\u001c:t)\u0015I\u00141KA+\u0011\u0015i$\u00031\u0001?\u0011\u0015I%\u00031\u0001KQ\u0011\u0011bJW.)\u000bIqF-a\u0017-\u0003\u0019Dc\u0001AA0I\u0006-\u0004\u0003BA1\u0003Oj!!a\u0019\u000b\u0007\u0005\u0015$+A\u0002ba&LA!!\u001b\u0002d\t\u0019A+Y4\"\u0005\u00055\u0014aC5oi\u0016<'/\u0019;j_:Dc\u0001AA0I\u0006E\u0014EAA:\u0003Q\u0011\u0017M_3muMD\u0017M\u001d3`G>,h\u000e\u001e\u001e2c\u0001")
/* loaded from: input_file:kafka/link/ClusterLinkAsyncTaskIntegrationTest.class */
public class ClusterLinkAsyncTaskIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private final long offsetToCommit = 10;
    private final long syncPeriod = 100;
    private final String consumerGroup = "testGroup";

    public long offsetToCommit() {
        return this.offsetToCommit;
    }

    public long syncPeriod() {
        return this.syncPeriod;
    }

    public String consumerGroup() {
        return this.consumerGroup;
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager, str2);
        }, None$.MODULE$);
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        properties.setProperty("enable.auto.commit", "false");
        ClusterLinkTestHarness destCluster2 = destCluster();
        Consumer createConsumer = destCluster2.createConsumer(destCluster2.createConsumer$default$1(), destCluster2.createConsumer$default$2(), properties, destCluster2.createConsumer$default$4());
        createConsumer.subscribe(CollectionConverters$.MODULE$.IterableHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJavaCollection());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$2(createConsumer)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Failed to consume records on the destination");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        createConsumer.commitSync();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(ConsumerGroupInUseTaskErrorCode$.MODULE$, new Some("Unable to commit offsets for consumer group testGroup on the destination cluster because there are active members on the destination already.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager2, str3);
        }, new Some("consumer-offset-sync"));
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.IN_ERROR, Collections.singletonList(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.CONSUMER_GROUP_IN_USE_ERROR, "Unable to commit offsets for consumer group testGroup on the destination cluster because there are active members on the destination already."))));
        createConsumer.close();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(topic(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateManagementVariousScenarios(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "6000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        verifyTaskStateAndMetrics(NotConfiguredTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager, str2);
        }, None$.MODULE$);
        Map<String, String> map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.alterClusterLink(linkName(), map, destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(MisconfigurationTaskErrorCode$.MODULE$, new Some("consumer.offset.sync.enable is true but no consumer group filters are specified. No consumer offsets will be migrated.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager2, str3);
        }, new Some("consumer-offset-sync"));
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.IN_ERROR, Collections.singletonList(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.MISCONFIGURATION_ERROR, "consumer.offset.sync.enable is true but no consumer group filters are specified. No consumer offsets will be migrated."))));
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map2, destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager3, str4) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager3, str4);
        }, new Some("consumer-offset-sync"));
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.linkTopic(topic(), (short) 2, linkName(), destCluster3.linkTopic$default$4(), destCluster3.linkTopic$default$5());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyConsumerOffsetMigrationMetrics();
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager4, str5) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager4, str5);
        }, new Some("consumer-offset-sync"));
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.PAUSED, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(LinkPausedTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager5, str6) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager5, str6);
        }, None$.MODULE$);
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.LINK_PAUSED, Collections.emptyList()));
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager6, str7) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager6, str7);
        }, new Some("consumer-offset-sync"));
        String property = destLinkProps.getProperty("sasl.jaas.config");
        String generateInvalidCredentials = generateInvalidCredentials(sourceCluster());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), generateInvalidCredentials)})), destCluster4.alterClusterLink$default$3(), destCluster4.alterClusterLink$default$4(), destCluster4.alterClusterLink$default$5());
        verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(AuthenticationTaskErrorCode$.MODULE$, new Some("Unable to list consumer groups due to authentication issues.")), Nil$.MODULE$), linkName(), (linkManager7, str8) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager7, str8);
        }, new Some("consumer-offset-sync"));
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.IN_ERROR, Collections.singletonList(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.AUTHENTICATION_ERROR, "Unable to list consumer groups due to authentication issues."))));
        ClusterLinkTestHarness destCluster5 = destCluster();
        destCluster5.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), property)})), destCluster5.alterClusterLink$default$3(), destCluster5.alterClusterLink$default$4(), destCluster5.alterClusterLink$default$5());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager8, str9) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager8, str9);
        }, new Some("consumer-offset-sync"));
        Map<String, String> map3 = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp()), "100"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp()), "1")}));
        ClusterLinkTestHarness destCluster6 = destCluster();
        destCluster6.alterClusterLink(linkName(), map3, destCluster6.alterClusterLink$default$3(), destCluster6.alterClusterLink$default$4(), destCluster6.alterClusterLink$default$5());
        sourceCluster().killAllBrokers();
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.UNAVAILABLE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(LinkUnavailableTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager9, str10) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager9, str10);
        }, None$.MODULE$);
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.LINK_UNAVAILABLE, Collections.emptyList()));
        restartSource(restartSource$default$1());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager10, str11) -> {
            return this.taskDesc(ClusterLinkSyncOffsetsTaskType$.MODULE$, linkManager10, str11);
        }, new Some("consumer-offset-sync"));
        verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()));
        ClusterLinkTestHarness destCluster7 = destCluster();
        destCluster7.unlinkTopic(topic(), linkName(), destCluster7.unlinkTopic$default$3(), destCluster7.unlinkTopic$default$4(), destCluster7.unlinkTopic$default$5(), destCluster7.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster8 = destCluster();
        destCluster8.deleteClusterLink(linkName(), destCluster8.deleteClusterLink$default$2(), destCluster8.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationWithAddedConsumerGroup(String str, boolean z) {
        String stripMargin$extension = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"groupFilters\": [\n          |  {\n          |     \"name\": \"").append(consumerGroup()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("testGroup2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString()));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        Map<String, String> map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), stripMargin$extension), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))}));
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), map, destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), topic(), 0, 20L, "testGroup2");
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, "testGroup2", verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationWithAddedTopic(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness sourceCluster2 = sourceCluster();
        sourceCluster2.createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster2.createTopic$default$4(), sourceCluster2.createTopic$default$5(), sourceCluster2.createTopic$default$6());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic("linkedTopic2", (short) 2, linkName(), destCluster2.linkTopic$default$4(), clusterLinkPrefix());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), "linkedTopic2", 0, 20L, consumerGroup());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), 0, 20L, consumerGroup(), verifyOffsetMigration$default$5());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), false, destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup()).replaceAll("include", "exclude"))})));
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        ClusterLinkTestHarness destCluster4 = destCluster();
        Consumer createConsumer = destCluster4.createConsumer(destCluster4.createConsumer$default$1(), destCluster4.createConsumer$default$2(), properties, destCluster4.createConsumer$default$4());
        createConsumer.subscribe(Collections.singleton("linkedTopic2"));
        do {
            createConsumer.poll(Duration.ofMillis(10L));
        } while (createConsumer.assignment().isEmpty());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(() -> {
            return createConsumer.poll(Duration.ofMillis(10L));
        });
        try {
            ClusterLinkTestHarness destCluster5 = destCluster();
            destCluster5.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append("linkedTopic2").toString(), linkName(), destCluster5.unlinkTopic$default$3(), destCluster5.unlinkTopic$default$4(), destCluster5.unlinkTopic$default$5(), destCluster5.unlinkTopic$default$6());
            newSingleThreadExecutor.shutdownNow();
            ClusterLinkTestHarness destCluster6 = destCluster();
            destCluster6.deleteClusterLink(linkName(), destCluster6.deleteClusterLink$default$2(), destCluster6.deleteClusterLink$default$3());
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSyncTopicsConfigsTaskState(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        Properties properties = new Properties();
        properties.setProperty("segment.bytes", "1000");
        properties.setProperty("retention.bytes", "1000");
        properties.setProperty("retention.ms", "10000000");
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), properties, sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(14).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",segment.bytes").toString());
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        while (sourceCluster().leaderLog(topicPartition).logStartOffset() <= 100) {
            produceToSourceAndWaitForMirror(10);
        }
        produceToSourceAndWaitForMirror(10);
        AbstractLog leaderLog = destCluster().leaderLog(topicPartition);
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("segment.bytes"), "999")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testSyncTopicsConfigsTaskState$1 = $anonfun$testSyncTopicsConfigsTaskState$1(leaderLog);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testSyncTopicsConfigsTaskState$1);
            if ($anonfun$testSyncTopicsConfigsTaskState$2($anonfun$testSyncTopicsConfigsTaskState$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(999, tuple2._1$mcI$sp());
        if (z) {
            verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager, str2);
            }, new Some("topic-configs-sync"));
            verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("TopicConfigsSync", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager2, str3) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager2, str3);
            }, None$.MODULE$);
            verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("TopicConfigsSync", ClusterLinkTaskDescription.ClusterLinkTaskState.UNKNOWN, Collections.singletonList(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.LINK_COORDINATOR_NOT_ENABLED_ERROR, "The cluster linking link coordinator is not enabled."))));
        }
        String property = destLinkProps.getProperty("sasl.jaas.config");
        String generateInvalidCredentials = generateInvalidCredentials(sourceCluster());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), generateInvalidCredentials)})), destCluster2.alterClusterLink$default$3(), destCluster2.alterClusterLink$default$4(), destCluster2.alterClusterLink$default$5());
        if (z) {
            verifyTaskStateAndMetrics(InErrorTaskState$.MODULE$, new $colon.colon(new Tuple2(AuthenticationTaskErrorCode$.MODULE$, new Some(new StringBuilder(67).append("Unable to describe topic configs due to authentication issues for ").append(topic()).append(".").toString())), Nil$.MODULE$), linkName(), (linkManager3, str4) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager3, str4);
            }, new Some("topic-configs-sync"));
            verifyTaskInformationFromDescribeClusterLinks(linkName(), new ClusterLinkTaskDescription("TopicConfigsSync", ClusterLinkTaskDescription.ClusterLinkTaskState.IN_ERROR, Collections.singletonList(new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.AUTHENTICATION_ERROR, new StringBuilder(67).append("Unable to describe topic configs due to authentication issues for ").append(topic()).append(".").toString()))));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager4, str5) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager4, str5);
            }, None$.MODULE$);
        }
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sasl.jaas.config"), property)})), destCluster3.alterClusterLink$default$3(), destCluster3.alterClusterLink$default$4(), destCluster3.alterClusterLink$default$5());
        if (z) {
            verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager5, str6) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager5, str6);
            }, new Some("topic-configs-sync"));
        } else {
            verifyTaskStateAndMetrics(UnknownTaskState$.MODULE$, new $colon.colon(new Tuple2(LinkCoordinatorNotEnabledTaskErrorCode$.MODULE$, new Some("The cluster linking link coordinator is not enabled.")), Nil$.MODULE$), linkName(), (linkManager6, str7) -> {
                return this.taskDesc(ClusterLinkSyncTopicConfigsTaskType$.MODULE$, linkManager6, str7);
            }, None$.MODULE$);
        }
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTaskStateForVariousClusterLinkingTasks(String str, boolean z) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        waitForLinkStateOnDest(linkName(), ClusterLinkDescription.LinkState.ACTIVE, waitForLinkStateOnDest$default$3());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager, str2) -> {
            return this.taskDesc(ClusterLinkCheckAvailabilityTaskType$.MODULE$, linkManager, str2);
        }, new Some("check-availability"), destCluster().brokers().toSeq(), (Seq) Seq$.MODULE$.empty());
        verifyTaskStateAndMetrics(ActiveTaskState$.MODULE$, (Seq) Seq$.MODULE$.empty(), linkName(), (linkManager2, str3) -> {
            return this.taskDesc(PeriodicPartitionSchedulerTaskType$.MODULE$, linkManager2, str3);
        }, new Some("periodic-partition-scheduler"), destCluster().brokers().toSeq(), (Seq) Seq$.MODULE$.empty());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTopicConfigSync(String str, boolean z) {
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceToSourceCluster(20);
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        destCluster().linkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), topic(), replicationFactor(), linkName(), (Map) Map$.MODULE$.empty(), new Some(OffsetSpec.forTimestamp(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1L))));
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSync$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Topic configs did not get propagated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyTopicConfigChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAddPartitions(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        if (str.equals("kraft")) {
            Assumptions.assumeFalse(useSourceInitiatedLink());
        }
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        Uuid createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testAddPartitions$1 = $anonfun$testAddPartitions$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testAddPartitions$1);
            if ($anonfun$testAddPartitions$2(this, $anonfun$testAddPartitions$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(numPartitions(), tuple2._1$mcI$sp());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAddPartitionMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAddPartitionsWithSourceLeaderEpochBump(String str, boolean z) {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        Properties destLinkProps = destLinkProps((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")})));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testAddPartitionsWithSourceLeaderEpochBump$1 = $anonfun$testAddPartitionsWithSourceLeaderEpochBump$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testAddPartitionsWithSourceLeaderEpochBump$1);
            if ($anonfun$testAddPartitionsWithSourceLeaderEpochBump$2(this, $anonfun$testAddPartitionsWithSourceLeaderEpochBump$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(numPartitions(), tuple2._1$mcI$sp());
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testAddPartitionsWithSourceLeaderEpochBump$3(this, BoxesRunTime.unboxToInt(obj));
        });
        bumpLeaderEpochs$1(sourceCluster(), 3, map);
        verifyLeaderEpochIsBumped$1(sourceCluster(), map, 3);
        verifyLeaderEpochIsBumped$1(destCluster(), map, 3);
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), false, destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testIntervalChangeForPeriodicTasks(String str, boolean z) {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        destLinkProps.setProperty("metadata.max.age.ms", String.valueOf(300000));
        if (StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(clusterLinkPrefix()))) {
            destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ClusterLinkPrefixProp(), clusterLinkPrefix());
        }
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), String.valueOf(syncPeriod()))})));
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        sourceCluster.createTopic(topic(), numPartitions(), 2, sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        ClusterLinkTestHarness destCluster = destCluster();
        destCluster.linkTopic(topic(), (short) 2, linkName(), destCluster.linkTopic$default$4(), clusterLinkPrefix());
        sourceCluster().alterTopic(topic(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testIntervalChangeForPeriodicTasks$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Topic configs did not get propagated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        verifyTopicConfigChangeMetrics();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))})));
        verifyOffsetMigration(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), 0, offsetToCommit(), consumerGroup(), verifyOffsetMigration$default$5());
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster2.unlinkTopic$default$3(), destCluster2.unlinkTopic$default$4(), destCluster2.unlinkTopic$default$5(), destCluster2.unlinkTopic$default$6());
        destCluster().deleteTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), true);
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))})));
        waitForAutoMirrorCreation(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString());
        alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})));
        ClusterLinkTestHarness destCluster3 = destCluster();
        destCluster3.unlinkTopic(new StringBuilder(0).append(clusterLinkPrefix()).append(topic()).toString(), linkName(), destCluster3.unlinkTopic$default$3(), destCluster3.unlinkTopic$default$4(), destCluster3.unlinkTopic$default$5(), destCluster3.unlinkTopic$default$6());
        ClusterLinkTestHarness destCluster4 = destCluster();
        destCluster4.deleteClusterLink(linkName(), destCluster4.deleteClusterLink$default$2(), destCluster4.deleteClusterLink$default$3());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators(String str, boolean z) {
        useBidirectionalLink_$eq(false);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "1");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(14).append(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().mkString(",")).append(",segment.bytes").toString());
        destLinkProps.put("metadata.max.age.ms", Long.toString(syncPeriod()));
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        createClusterLink("testLinkTwo-0", destLinkProps, createClusterLink$default$3(), createClusterLink$default$4(), createClusterLink$default$5());
        if (z) {
            KafkaBroker linkCoordinator = destCluster().linkCoordinator(linkName());
            KafkaBroker linkCoordinator2 = destCluster().linkCoordinator("testLinkTwo-0");
            if (linkCoordinator.config().brokerId() == linkCoordinator2.config().brokerId()) {
                changeCoordinator(destCluster(), true, "testLinkTwo-0");
                linkCoordinator2 = destCluster().linkCoordinator("testLinkTwo-0");
            }
            Assertions.assertTrue(linkCoordinator.config().brokerId() != linkCoordinator2.config().brokerId());
        }
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$3(this, "testLinkTwo-0", z);
                TestUtils$ testUtils$2 = TestUtils$.MODULE$;
                long j2 = 1;
                long currentTimeMillis2 = System.currentTimeMillis();
                while (true) {
                    try {
                        $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$4(this, z, "testLinkTwo-0");
                        return;
                    } catch (AssertionError e) {
                        if (System.currentTimeMillis() - currentTimeMillis2 > 15000) {
                            throw e;
                        }
                        if (testUtils$2.logger().underlying().isInfoEnabled()) {
                            testUtils$2.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$2, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j2).append(", and then retrying.").toString()));
                        }
                        Thread.sleep(j2);
                        j2 += package$.MODULE$.min(j2, 1000L);
                    }
                }
            } catch (AssertionError e2) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e2;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$2(Consumer consumer) {
        return !consumer.poll(Duration.ZERO).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup$3() {
        return "Failed to consume records on the destination";
    }

    public static final /* synthetic */ int $anonfun$testSyncTopicsConfigsTaskState$1(AbstractLog abstractLog) {
        return abstractLog.config().segmentSize;
    }

    public static final /* synthetic */ boolean $anonfun$testSyncTopicsConfigsTaskState$2(int i) {
        return ((long) i) == 999;
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSync$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSync$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ int $anonfun$testAddPartitions$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopic(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitions$2(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, int i) {
        return i == clusterLinkAsyncTaskIntegrationTest.numPartitions();
    }

    public static final /* synthetic */ int $anonfun$testAddPartitionsWithSourceLeaderEpochBump$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopic(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitionsWithSourceLeaderEpochBump$2(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, int i) {
        return i == clusterLinkAsyncTaskIntegrationTest.numPartitions();
    }

    public static final /* synthetic */ TopicPartition $anonfun$testAddPartitionsWithSourceLeaderEpochBump$3(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, int i) {
        return new TopicPartition(clusterLinkAsyncTaskIntegrationTest.topic(), i);
    }

    private static final void bumpLeaderEpochs$1(ClusterLinkTestHarness clusterLinkTestHarness, int i, IndexedSeq indexedSeq) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(i2 -> {
            indexedSeq.foreach(topicPartition -> {
                return BoxesRunTime.boxToInteger(clusterLinkTestHarness.changeLeader(topicPartition));
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitionsWithSourceLeaderEpochBump$7(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition, int i) {
        return clusterLinkTestHarness.replicaStatusWithPartitionResult(topicPartition.topic(), topicPartition.partition()).leaderEpoch().orElseGet(() -> {
            return 0;
        }) >= i;
    }

    public static final /* synthetic */ String $anonfun$testAddPartitionsWithSourceLeaderEpochBump$9() {
        return "Leader epoch did not get bumped";
    }

    public static final /* synthetic */ void $anonfun$testAddPartitionsWithSourceLeaderEpochBump$6(ClusterLinkTestHarness clusterLinkTestHarness, int i, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testAddPartitionsWithSourceLeaderEpochBump$7(clusterLinkTestHarness, topicPartition, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Leader epoch did not get bumped");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private static final void verifyLeaderEpochIsBumped$1(ClusterLinkTestHarness clusterLinkTestHarness, IndexedSeq indexedSeq, int i) {
        indexedSeq.foreach(topicPartition -> {
            $anonfun$testAddPartitionsWithSourceLeaderEpochBump$6(clusterLinkTestHarness, i, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$testIntervalChangeForPeriodicTasks$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest) {
        return clusterLinkAsyncTaskIntegrationTest.destCluster().describeTopicConfig(new StringBuilder(0).append(clusterLinkAsyncTaskIntegrationTest.clusterLinkPrefix()).append(clusterLinkAsyncTaskIntegrationTest.topic()).toString()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testIntervalChangeForPeriodicTasks$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$1(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals(clusterLinkAsyncTaskIntegrationTest.linkName());
    }

    public static final /* synthetic */ boolean $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$2(String str, ClusterLinkDescription clusterLinkDescription) {
        return clusterLinkDescription.linkName().equals(str);
    }

    private final void runAsserts$1(Seq seq, boolean z, String str) {
        Assertions.assertEquals(2, seq.size());
        ClusterLinkDescription clusterLinkDescription = (ClusterLinkDescription) ((IterableOps) seq.filter(clusterLinkDescription2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$1(this, clusterLinkDescription2));
        })).head();
        Assertions.assertNotNull(clusterLinkDescription);
        scala.collection.immutable.Seq seq2 = CollectionConverters$.MODULE$.ListHasAsScala(clusterLinkDescription.taskDescriptions()).asScala().toSeq();
        ClusterLinkTaskDescription.ClusterLinkTaskState clusterLinkTaskState = z ? ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE : ClusterLinkTaskDescription.ClusterLinkTaskState.UNKNOWN;
        Assertions.assertEquals(new $colon.colon(new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("AclSync", ClusterLinkTaskDescription.ClusterLinkTaskState.NOT_CONFIGURED, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("TopicConfigsSync", clusterLinkTaskState, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("AutoCreateMirror", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()), Nil$.MODULE$)))), seq2);
        ClusterLinkDescription clusterLinkDescription3 = (ClusterLinkDescription) ((IterableOps) seq.filter(clusterLinkDescription4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$2(str, clusterLinkDescription4));
        })).head();
        Assertions.assertNotNull(clusterLinkDescription3);
        Assertions.assertEquals(new $colon.colon(new ClusterLinkTaskDescription("ConsumerOffsetSync", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("AclSync", ClusterLinkTaskDescription.ClusterLinkTaskState.NOT_CONFIGURED, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("TopicConfigsSync", clusterLinkTaskState, Collections.emptyList()), new $colon.colon(new ClusterLinkTaskDescription("AutoCreateMirror", ClusterLinkTaskDescription.ClusterLinkTaskState.ACTIVE, Collections.emptyList()), Nil$.MODULE$)))), CollectionConverters$.MODULE$.ListHasAsScala(clusterLinkDescription3.taskDescriptions()).asScala().toSeq());
    }

    public static final /* synthetic */ void $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$3(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, String str, boolean z) {
        ClusterLinkTestHarness destCluster = clusterLinkAsyncTaskIntegrationTest.destCluster();
        clusterLinkAsyncTaskIntegrationTest.runAsserts$1(destCluster.describeClusterLinksWithTasks(new Some(new $colon.colon(clusterLinkAsyncTaskIntegrationTest.linkName(), new $colon.colon(str, Nil$.MODULE$))), destCluster.describeClusterLinksWithTasks$default$2()), z, str);
    }

    public static final /* synthetic */ void $anonfun$testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators$4(ClusterLinkAsyncTaskIntegrationTest clusterLinkAsyncTaskIntegrationTest, boolean z, String str) {
        ClusterLinkTestHarness destCluster = clusterLinkAsyncTaskIntegrationTest.destCluster();
        clusterLinkAsyncTaskIntegrationTest.runAsserts$1(destCluster.describeClusterLinksWithTasks(None$.MODULE$, destCluster.describeClusterLinksWithTasks$default$2()), z, str);
    }
}
