package kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConvertToMirrorTopicTaskType$;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.InternalTaskErrorCode$;
import kafka.server.link.NoErrorCode$;
import kafka.server.link.TopicType$;
import kafka.utils.Implicits$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: BidirectionalLinkIntegrationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:10")})
@ScalaSignature(bytes = "\u0006\u0001\r%b\u0001B\u0011#\u0001\u001dBQ\u0001\f\u0001\u0005\u00025BQa\f\u0001\u0005BABQa\u000e\u0001\u0005\u0002aBQ\u0001\u001b\u0001\u0005\u0002%DQ\u0001\u001d\u0001\u0005\u0002EDQ\u0001\u001f\u0001\u0005\u0002eDq!!\u0001\u0001\t\u0003\t\u0019\u0001C\u0004\u0002\u0012\u0001!\t!a\u0005\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002$!9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0002bBA!\u0001\u0011\u0005\u00111\t\u0005\b\u0003#\u0002A\u0011AA*\u0011\u001d\t\t\u0007\u0001C\u0001\u0003GBq!!\u001d\u0001\t\u0003\t\u0019\bC\u0004\u0002\u0002\u0002!\t!a!\t\u000f\u0005E\u0005\u0001\"\u0001\u0002\u0014\"9\u0011\u0011\u0015\u0001\u0005\u0002\u0005\r\u0006bBAY\u0001\u0011\u0005\u00111\u0017\u0005\b\u0003\u0003\u0004A\u0011AAb\u0011\u001d\t\t\u000e\u0001C\u0005\u0003'Dq!!<\u0001\t\u0003\ty\u000fC\u0004\u0003\u0002\u0001!IAa\u0001\t\u0013\t\u0005\u0004!%A\u0005\n\t\r\u0004\"\u0003B=\u0001E\u0005I\u0011\u0002B>\u0011\u001d\u0011y\b\u0001C\u0005\u0005\u0003CqA!$\u0001\t\u0003\u0011y\tC\u0005\u00034\u0002\t\n\u0011\"\u0001\u00036\"I!\u0011\u0018\u0001\u0012\u0002\u0013\u0005!1\r\u0005\n\u0005w\u0003\u0011\u0013!C\u0001\u0005wBqA!0\u0001\t\u0013\u0011y\fC\u0004\u0003\\\u0002!IA!8\t\u000f\r\u0005\u0001\u0001\"\u0005\u0004\u0004\t\u0001#)\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6J]R,wM]1uS>tG+Z:u\u0015\t\u0019C%\u0001\u0003mS:\\'\"A\u0013\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u000b\t\u0003S)j\u0011AI\u0005\u0003W\t\u0012!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001/!\tI\u0003!A\rnCf\u0014W-V:f\u0005&$\u0017N]3di&|g.\u00197MS:\\G#A\u0019\u0011\u0005I*T\"A\u001a\u000b\u0003Q\nQa]2bY\u0006L!AN\u001a\u0003\tUs\u0017\u000e^\u0001-i\u0016\u001cHOQ5eSJ,7\r^5p]\u0006dG*\u001b8l/&$\bnT;uE>,h\u000eZ\"p]:,7\r^5p]N$B!M\u001dG\u0017\")!h\u0001a\u0001w\u00051\u0011/^8sk6\u0004\"\u0001P\"\u000f\u0005u\n\u0005C\u0001 4\u001b\u0005y$B\u0001!'\u0003\u0019a$o\\8u}%\u0011!iM\u0001\u0007!J,G-\u001a4\n\u0005\u0011+%AB*ue&twM\u0003\u0002Cg!)qi\u0001a\u0001\u0011\u0006Y1m\\8sI&t\u0017\r^8s!\t\u0011\u0014*\u0003\u0002Kg\t9!i\\8mK\u0006t\u0007\"\u0002'\u0004\u0001\u0004Y\u0014\u0001\u00057pG\u0006d'+\u001a9mS\u000e\fG/[8oQ\u0011\u0019a\nX/\u0011\u0005=SV\"\u0001)\u000b\u0005E\u0013\u0016\u0001\u00039s_ZLG-\u001a:\u000b\u0005M#\u0016A\u00029be\u0006l7O\u0003\u0002V-\u00069!.\u001e9ji\u0016\u0014(BA,Y\u0003\u0015QWO\\5u\u0015\u0005I\u0016aA8sO&\u00111\f\u0015\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0002=\u0006\nq,\u0001\u0015rk>\u0014X/\\\"p_J$\u0017N\\1u_J\u0014V\r\u001d7jG\u0006$\u0018n\u001c8D_6\u0014\u0017N\\1uS>t7\u000f\u000b\u0003\u0004C\u00164\u0007C\u00012d\u001b\u0005\u0011\u0016B\u00013S\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001h\u0003uZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018d_>\u0014H-\u001b8bi>\u0014Xh_\u0019~]1|7-\u00197SKBd\u0017nY1uS>tWh\u001f\u001a~\u0003=\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5P]\u0016\u001cuN\u001c8fGRLwN\\%oSRL\u0017\r^8s)\u0011\t$n\u001b7\t\u000bi\"\u0001\u0019A\u001e\t\u000b\u001d#\u0001\u0019\u0001%\t\u000b1#\u0001\u0019A\u001e)\t\u0011qEL\u001c\u0017\u0002=\"\"A!Y3g\u0003\u0019\"Xm\u001d;CS\u0012L'/Z2uS>t\u0017\r\u001c'j].<\u0016\u000e\u001e5BkR|W*\u001b:s_JLgn\u001a\u000b\u0005cI\u001cH\u000fC\u0003;\u000b\u0001\u00071\bC\u0003H\u000b\u0001\u0007\u0001\nC\u0003M\u000b\u0001\u00071\b\u000b\u0003\u0006\u001dr3H&\u00010)\t\u0015\tWMZ\u00013i\u0016\u001cHOQ5eSJ,7\r^5p]\u0006dG*\u001b8l/&$\bn\\;u\u0013:\u001cG.\u001e3j]\u001e\u0014V-\\8uK6K'O]8sgR!\u0011G_>}\u0011\u0015Qd\u00011\u0001<\u0011\u00159e\u00011\u0001I\u0011\u0015ae\u00011\u0001<Q\u00111a\n\u0018@-\u0003yCCAB1fM\u0006IB/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8s)\u001d\t\u0014QAA\u0004\u0003\u0013AQAO\u0004A\u0002mBQaR\u0004A\u0002!CQ\u0001T\u0004A\u0002mBSa\u0002(]\u0003\u001ba\u0013A\u0018\u0015\u0005\u000f\u0005,g-A\u0018uKN$(+\u001a<feN,\u0017I\u001c3Ti\u0006\u0014H/T5se>\u0014x+\u001b;i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00115b]\u001e,7\u000fF\u00042\u0003+\t9\"!\u0007\t\u000biB\u0001\u0019A\u001e\t\u000b\u001dC\u0001\u0019\u0001%\t\u000b1C\u0001\u0019A\u001e)\u000b!qE,!\b-\u0003yCC\u0001C1fM\u0006\u0001C/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8s/&$\b\u000eT1h)\u001d\t\u0014QEA\u0014\u0003SAQAO\u0005A\u0002mBQaR\u0005A\u0002!CQ\u0001T\u0005A\u0002mBS!\u0003(]\u0003[a\u0013A\u0018\u0015\u0005\u0013\u0005,g-\u0001\u001buKN$(+\u001a<feN,\u0017I\u001c3Ti\u0006\u0014H/T5se>\u0014\u0018N\\4XSRD'+Z7pi\u0016\u001cE.^:uKJ\u0014Vm\u001d;beR$r!MA\u001b\u0003o\tI\u0004C\u0003;\u0015\u0001\u00071\bC\u0003H\u0015\u0001\u0007\u0001\nC\u0003M\u0015\u0001\u00071\bK\u0003\u000b\u001dr\u000bi\u0004L\u0001_Q\u0011Q\u0011-\u001a4\u0002gQ,7\u000f\u001e*fm\u0016\u00148/Z!oIN#\u0018M\u001d;NSJ\u0014xN]5oO^KG\u000f\u001b'pG\u0006d7\t\\;ti\u0016\u0014(+Z:uCJ$HcB\u0019\u0002F\u0005\u001d\u0013\u0011\n\u0005\u0006u-\u0001\ra\u000f\u0005\u0006\u000f.\u0001\r\u0001\u0013\u0005\u0006\u0019.\u0001\ra\u000f\u0015\u0006\u00179c\u0016Q\n\u0017\u0002=\"\"1\"Y3g\u0003Q\"Xm\u001d;SKZ,'o]3B]\u0012\u001cF/\u0019:u\u001b&\u0014(o\u001c:j]\u001e<\u0016\u000e\u001e5QCV\u001cX-\u00118e+:\u0004\u0018-^:f\u0019&t7n\u001d\u000b\bc\u0005U\u0013qKA-\u0011\u0015QD\u00021\u0001<\u0011\u00159E\u00021\u0001I\u0011\u0015aE\u00021\u0001<Q\u0015aa\nXA/Y\u0005q\u0006\u0006\u0002\u0007bK\u001a\fq\b^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe&twmV5uQB\u000bWo]3B]\u0012,f\u000e]1vg\u0016dunY1m\u001b&\u0014(o\u001c:U_BL7\rF\u00042\u0003K\n9'!\u001b\t\u000bij\u0001\u0019A\u001e\t\u000b\u001dk\u0001\u0019\u0001%\t\u000b1k\u0001\u0019A\u001e)\u000b5qE,!\u001c-\u0003yCC!D1fM\u00061F/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8sS:<w+\u001b;i!\u0006,8/Z!oIVs\u0007/Y;tK2{7-\u00197NSJ\u0014xN\u001d+pa&\u001cw+\u001b;i\u0019>\u001c\u0017\r\\\"mkN$XM\u001d*fgR\f'\u000f\u001e\u000b\bc\u0005U\u0014qOA=\u0011\u0015Qd\u00021\u0001<\u0011\u00159e\u00021\u0001I\u0011\u0015ae\u00021\u0001<Q\u0015qa\nXA?Y\u0005q\u0006\u0006\u0002\bbK\u001a\f!\u0006^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe^KG\u000f\u001b#fY\u0016$X\r\u001a+pa&\u001c7\u000fF\u00042\u0003\u000b\u000b9)!#\t\u000biz\u0001\u0019A\u001e\t\u000b\u001d{\u0001\u0019\u0001%\t\u000b1{\u0001\u0019A\u001e)\u000b=qE,!$-\u0003yCCaD1fM\u0006iC/Z:u%>dGNY1dW6K'O]8s\rJ|W\u000eU3oI&twmU=oG\"\u0014xN\\5{KN#\u0018\r^3\u0015\u000fE\n)*a&\u0002\u001a\")!\b\u0005a\u0001w!)q\t\u0005a\u0001\u0011\")A\n\u0005a\u0001w!*\u0001C\u0014/\u0002\u001e2\na\f\u000b\u0003\u0011C\u00164\u0017A\n;fgR\u0014v\u000e\u001c7cC\u000e\\W*\u001b:s_J4\u0015-\u001b7t\u001f:\u0004VM\u001c3j]\u001el\u0015N\u001d:peR9\u0011'!*\u0002(\u0006%\u0006\"\u0002\u001e\u0012\u0001\u0004Y\u0004\"B$\u0012\u0001\u0004A\u0005\"\u0002'\u0012\u0001\u0004Y\u0004&B\tO9\u00065F&\u00010)\tE\tWMZ\u0001$i\u0016\u001cH\u000fU1vg\u0016l\u0015N\u001d:pe\u001a\u000b\u0017\u000e\\:P]B+g\u000eZ5oO6K'O]8s)\u001d\t\u0014QWA\\\u0003sCQA\u000f\nA\u0002mBQa\u0012\nA\u0002!CQ\u0001\u0014\nA\u0002mBSA\u0005(]\u0003{c\u0013A\u0018\u0015\u0005%\u0005,g-A\ruKN$(+\u001a<feN,\u0017I\u001c3QCV\u001cX-T5se>\u0014HcB\u0019\u0002F\u0006\u001d\u0017\u0011\u001a\u0005\u0006uM\u0001\ra\u000f\u0005\u0006\u000fN\u0001\r\u0001\u0013\u0005\u0006\u0019N\u0001\ra\u000f\u0015\u0006'9c\u0016Q\u001a\u0017\u0002=\"\"1#Y3g\u0003]\u0001(o\u001c3vG\u0016\u0014VmY8sIN$vn\u00117vgR,'\u000fF\u00042\u0003+\fy.a9\t\u000f\u0005]G\u00031\u0001\u0002Z\u000691\r\\;ti\u0016\u0014\bcA\u0015\u0002\\&\u0019\u0011Q\u001c\u0012\u0003-\rcWo\u001d;fe2Kgn\u001b+fgRD\u0015M\u001d8fgNDa!!9\u0015\u0001\u0004Y\u0014!\u0002;pa&\u001c\u0007bBAs)\u0001\u0007\u0011q]\u0001\u000b]Vl'+Z2pe\u0012\u001c\bc\u0001\u001a\u0002j&\u0019\u00111^\u001a\u0003\u0007%sG/\u0001\u0012uKN$X*\u001e7uSBdW-T5se>\u00148\u000b^1uKR\u0013\u0018M\\:ji&|gn\u001d\u000b\bc\u0005E\u00181_A{\u0011\u0015QT\u00031\u0001<\u0011\u00159U\u00031\u0001I\u0011\u0015aU\u00031\u0001<Q\u0015)b\nXA}Y\u0005q\u0006&B\u000bbK\u0006u\u0018EAA��\u0003!ZH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b' \u0018d_>\u0014H-\u001b8bi>\u0014Xh_\u0019~\u0003]1XM]5gs\nKG-\u001b:fGRLwN\\1m\u0019&t7\u000eF\u00052\u0005\u000b\u00119Ba\u0007\u0003R!9!q\u0001\fA\u0002\t%\u0011AF3bgRd\u0015N\\6D_:tWm\u0019;j_:lu\u000eZ3\u0011\t\t-!1C\u0007\u0003\u0005\u001bQ1a\tB\b\u0015\r\u0011\t\u0002J\u0001\u0007g\u0016\u0014h/\u001a:\n\t\tU!Q\u0002\u0002\u000f\u0007>tg.Z2uS>tWj\u001c3f\u0011\u001d\u0011IB\u0006a\u0001\u0005\u0013\tac^3ti2Kgn[\"p]:,7\r^5p]6{G-\u001a\u0005\n\u0005;1\u0002\u0013!a\u0001\u0005?\t!\u0002^8qS\u000e$\u0016\u0010]3t!\u0015\u0011$\u0011\u0005B\u0013\u0013\r\u0011\u0019c\r\u0002\u0007\u001fB$\u0018n\u001c8\u0011\r\t\u001d\"\u0011\u0007B\u001c\u001d\u0011\u0011IC!\f\u000f\u0007y\u0012Y#C\u00015\u0013\r\u0011ycM\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011\u0019D!\u000e\u0003\u0007M+\u0017OC\u0002\u00030M\u0002BA!\u000f\u0003L9!!1\bB$\u001d\u0011\u0011iD!\u0012\u000f\t\t}\"1\t\b\u0004}\t\u0005\u0013\"A\u0013\n\u0007\tEA%C\u0002$\u0005\u001fIAA!\u0013\u0003\u000e\u0005IAk\u001c9jGRK\b/Z\u0005\u0005\u0005\u001b\u0012yEA\u0005U_BL7\rV=qK*!!\u0011\nB\u0007\u0011%\u0011\u0019F\u0006I\u0001\u0002\u0004\u0011)&A\bd_:4\u0017nZ(wKJ\u0014\u0018\u000eZ3t!\u0019\u00119F!\u0018<w5\u0011!\u0011\f\u0006\u0004\u00057\u001a\u0014AC2pY2,7\r^5p]&!!q\fB-\u0005\ri\u0015\r]\u0001\"m\u0016\u0014\u0018NZ=CS\u0012L'/Z2uS>t\u0017\r\u001c'j].$C-\u001a4bk2$HeM\u000b\u0003\u0005KRCAa\b\u0003h-\u0012!\u0011\u000e\t\u0005\u0005W\u0012)(\u0004\u0002\u0003n)!!q\u000eB9\u0003%)hn\u00195fG.,GMC\u0002\u0003tM\n!\"\u00198o_R\fG/[8o\u0013\u0011\u00119H!\u001c\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\u0011wKJLg-\u001f\"jI&\u0014Xm\u0019;j_:\fG\u000eT5oW\u0012\"WMZ1vYR$C'\u0006\u0002\u0003~)\"!Q\u000bB4\u0003\t\u001awN\u001c4jOV\u0014X-T5se>\u0014HK]1og&$\u0018n\u001c8CCR\u001c\u0007nU5{KR)\u0011Ga!\u0003\n\"9!QQ\rA\u0002\t\u001d\u0015\u0001C2mkN$XM]:\u0011\r\t\u001d\"\u0011GAm\u0011\u001d\u0011Y)\u0007a\u0001\u0003O\f\u0011BY1uG\"\u001c\u0016N_3\u0002\u00131Lgn\u001b)s_B\u001cH\u0003\u0004BI\u0005C\u0013)K!+\u00030\nE\u0006\u0003\u0002BJ\u0005;k!A!&\u000b\t\t]%\u0011T\u0001\u0005kRLGN\u0003\u0002\u0003\u001c\u0006!!.\u0019<b\u0013\u0011\u0011yJ!&\u0003\u0015A\u0013x\u000e]3si&,7\u000fC\u0004\u0003$j\u0001\rA!\u0003\u0002\u001d\r|gN\\3di&|g.T8eK\"9!q\u0015\u000eA\u0002\u0005e\u0017!\u0004:f[>$Xm\u00117vgR,'\u000fC\u0005\u0003,j\u0001\n\u00111\u0001\u0003.\u0006i1m\u001c8tk6,'o\u0012:pkB\u0004BA\rB\u0011w!I!Q\u0004\u000e\u0011\u0002\u0003\u0007!q\u0004\u0005\n\u0005'R\u0002\u0013!a\u0001\u0005+\n1\u0003\\5oWB\u0013x\u000e]:%I\u00164\u0017-\u001e7uIM*\"Aa.+\t\t5&qM\u0001\u0014Y&t7\u000e\u0015:paN$C-\u001a4bk2$H\u0005N\u0001\u0014Y&t7\u000e\u0015:paN$C-\u001a4bk2$H%N\u0001\rm\u0016\u0014\u0018NZ=NSJ\u0014xN\u001d\u000b\u0006c\t\u0005'1\u0019\u0005\b\u0003/t\u0002\u0019AAm\u0011\u001d\u0011)M\ba\u0001\u0005\u000f\f!\u0002]1si&$\u0018n\u001c8t!\u0019\u00119C!\r\u0003JB!!1\u001aBl\u001b\t\u0011iM\u0003\u0003\u0003P\nE\u0017AB2p[6|gNC\u0002&\u0005'T1A!6Y\u0003\u0019\t\u0007/Y2iK&!!\u0011\u001cBg\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\f\u0011C^3sS\u001aLH*\u001b8l\u001b\u0016$(/[2t)=\t$q\u001cBu\u0005[\u0014\tP!>\u0003z\nu\bb\u0002Bq?\u0001\u0007!1]\u0001\u0007Y&t7.\u00133\u0011\t\t-'Q]\u0005\u0005\u0005O\u0014iM\u0001\u0003Vk&$\u0007b\u0002Bv?\u0001\u0007\u0011\u0011\\\u0001\fK\u0006\u001cHo\u00117vgR,'\u000fC\u0004\u0003p~\u0001\r!!7\u0002\u0017],7\u000f^\"mkN$XM\u001d\u0005\b\u0005g|\u0002\u0019\u0001BI\u00035)\u0017m\u001d;MS:\\\u0007K]8qg\"9!q_\u0010A\u0002\tE\u0015!D<fgRd\u0015N\\6Qe>\u00048\u000f\u0003\u0004\u0003|~\u0001\raO\u0001\nK\u0006\u001cH\u000fV8qS\u000eDaAa@ \u0001\u0004Y\u0014!C<fgR$v\u000e]5d\u0003]\u0019'/Z1uK\nKG-\u001b:fGRLwN\\1m\u0019&t7\u000e\u0006\u0007\u0003d\u000e\u00151\u0011BB\u0006\u0007\u001b\u0019y\u0001\u0003\u0004\u0004\b\u0001\u0002\raO\u0001\tY&t7NT1nK\"9!1\u001e\u0011A\u0002\u0005e\u0007b\u0002BxA\u0001\u0007\u0011\u0011\u001c\u0005\b\u0005g\u0004\u0003\u0019\u0001BI\u0011\u001d\u00119\u0010\ta\u0001\u0005#Cc\u0001AB\n9\u000e}\u0001\u0003BB\u000b\u00077i!aa\u0006\u000b\u0007\reA+A\u0002ba&LAa!\b\u0004\u0018\t\u0019A+Y4\"\u0005\r\u0005\u0012\u0001\u00062bu\u0016d'h\u001d5be\u0012|6m\\;oij\n\u0004\u0007\u000b\u0004\u0001\u0007'a6QE\u0011\u0003\u0007O\t1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/BidirectionalLinkIntegrationTest.class */
public class BidirectionalLinkIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public void maybeUseBidirectionalLink() {
        useBidirectionalLink_$eq(true);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithOutboundConnections(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, verifyBidirectionalLink$default$3(), verifyBidirectionalLink$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithOneConnectionInitiator(String str, boolean z, String str2) {
        useSourceInitiatedLink_$eq(true);
        verifyBidirectionalLink(ConnectionMode$Inbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, verifyBidirectionalLink$default$3(), verifyBidirectionalLink$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithAutoMirroring(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "500")})));
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithoutIncludingRemoteMirrors(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, new Some(new $colon.colon(TopicType$.MODULE$.LOCAL_MIRROR(), Nil$.MODULE$)), verifyBidirectionalLink$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirror$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithLeaderEpochChanges(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        IntRef create = IntRef.create(3);
        bumpLeaderEpochs$1(destCluster, create, indexedSeq);
        verifyLeaderEpochIsBumped$1(destCluster, indexedSeq, create);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        verifyLeaderEpochIsBumped$1(sourceCluster, indexedSeq, create);
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
        bumpLeaderEpochs$1(sourceCluster, create, indexedSeq);
        create.elem += create.elem;
        verifyLeaderEpochIsBumped$1(sourceCluster, indexedSeq, create);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        verifyLeaderEpochIsBumped$1(destCluster, indexedSeq, create);
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithLag(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithLag$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        produceRecordsToCluster(sourceCluster, "topic", 10000);
        destCluster.pauseTopic("topic", false);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 10000);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithRemoteClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.killAllBrokers();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster.updateBootstrapServers();
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", true);
        Assertions.assertNotNull(describeMirrorTopic);
        Assertions.assertEquals(0, describeMirrorTopic.mirrorStateTransitionErrors().size());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithLocalClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster.updateBootstrapServers();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster.bootstrapServers(destCluster.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLinks(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.killAllBrokers();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster.updateBootstrapServers();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster.bootstrapServers(destCluster.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithDeletedTopics(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithDeletedTopics$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.deleteTopic(topic(), true);
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.deleteTopic(topic(), true);
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.deleteTopic(topic(), true);
        destCluster.createTopic(topic(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testRollbackMirrorFromPendingSynchronizeState(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testRollbackMirrorFromPendingSynchronizeState$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.killAllBrokers();
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterMirrors(topic(), AlterMirrorOp.ROLLBACK);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster.updateBootstrapServers();
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        sourceCluster.unlinkTopic(topic(), linkName(), sourceCluster.unlinkTopic$default$3(), false, sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        waitForMirroring(destCluster, indexedSeq);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testRollbackMirrorFailsOnPendingMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testRollbackMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            sourceCluster.alterMirrors(this.topic(), AlterMirrorOp.ROLLBACK);
        });
        sourceCluster.unlinkTopic(topic(), linkName(), sourceCluster.unlinkTopic$default$3(), false, sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testPauseMirrorFailsOnPendingMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testPauseMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(InternalTaskErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), 120000L);
        if (z) {
            verifyCoordinatorChangeHandlesStoppingAndStartingTasks(sourceCluster, createBidirectionalLink, topic(), ClusterLinkConvertToMirrorTopicTaskType$.MODULE$, true, true);
        }
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testPauseMirrorFailsOnPendingMirror$2(this, sourceCluster);
                Assertions.assertThrows(InvalidRequestException.class, () -> {
                    sourceCluster.alterMirrors(this.topic(), AlterMirrorOp.PAUSE);
                });
                sourceCluster.unlinkTopic(topic(), linkName(), sourceCluster.unlinkTopic$default$3(), false, sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
                sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndPauseMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndPauseMirror$1(str3, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, indexedSeq);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        sourceCluster.pauseTopic("topic", false);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, indexedSeq);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
    }

    private void produceRecordsToCluster(ClusterLinkTestHarness clusterLinkTestHarness, String str, int i) {
        KafkaProducer<byte[], byte[]> createProducer = clusterLinkTestHarness.createProducer(clusterLinkTestHarness.createProducer$default$1(), clusterLinkTestHarness.createProducer$default$2(), clusterLinkTestHarness.createProducer$default$3());
        produceRecords(createProducer, str, i, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        createProducer.close();
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultipleMirrorStateTransitions(String str, boolean z, String str2) {
        AlterMirrorsOptions timeoutMs = new AlterMirrorsOptions().timeoutMs(Predef$.MODULE$.int2Integer(20000));
        ObjectRef create = ObjectRef.create(sourceCluster());
        ObjectRef create2 = ObjectRef.create(destCluster());
        Map$ map$ = Map$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc((ClusterLinkTestHarness) create.elem);
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) create.elem;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc((ClusterLinkTestHarness) create2.elem);
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) create2.elem;
        Map apply = map$.apply(predef$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, clusterLinkTestHarness.createConfluentAdminClient(clusterLinkTestHarness.createConfluentAdminClient$default$1())), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, clusterLinkTestHarness2.createConfluentAdminClient(clusterLinkTestHarness2.createConfluentAdminClient$default$1()))}));
        ((ClusterLinkTestHarness) create.elem).producerConfig().setProperty("enable.idempotence", "false");
        ((ClusterLinkTestHarness) create2.elem).producerConfig().setProperty("enable.idempotence", "false");
        Map<String, String> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter("*"))}));
        createBidirectionalLink(linkName(), (ClusterLinkTestHarness) create2.elem, (ClusterLinkTestHarness) create.elem, linkProps(ConnectionMode$Outbound$.MODULE$, (ClusterLinkTestHarness) create.elem, linkProps$default$3(), linkProps$default$4(), map), linkProps(ConnectionMode$Outbound$.MODULE$, (ClusterLinkTestHarness) create2.elem, linkProps$default$3(), linkProps$default$4(), map));
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 6).map(obj -> {
            return $anonfun$testMultipleMirrorStateTransitions$4(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        indexedSeq.foreach(str3 -> {
            $anonfun$testMultipleMirrorStateTransitions$5(this, create, create2, str3);
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 4).foreach$mVc$sp(i -> {
            this.reverseMirroring$1(indexedSeq, apply, create2, timeoutMs, create);
        });
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) create.elem;
        Consumer createConsumer = clusterLinkTestHarness3.createConsumer(clusterLinkTestHarness3.createConsumer$default$1(), clusterLinkTestHarness3.createConsumer$default$2(), clusterLinkTestHarness3.createConsumer$default$3(), clusterLinkTestHarness3.createConsumer$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness4 = (ClusterLinkTestHarness) create2.elem;
        Consumer createConsumer2 = clusterLinkTestHarness4.createConsumer(clusterLinkTestHarness4.createConsumer$default$1(), clusterLinkTestHarness4.createConsumer$default$2(), clusterLinkTestHarness4.createConsumer$default$3(), clusterLinkTestHarness4.createConsumer$default$4());
        createConsumerAndCommitOffsets$1(createConsumer, indexedSeq);
        createConsumerAndCommitOffsets$1(createConsumer2, indexedSeq);
        Tuple2 splitAt = indexedSeq.splitAt(3);
        if (splitAt == null) {
            throw new MatchError((Object) null);
        }
        IndexedSeq indexedSeq2 = (IndexedSeq) splitAt._1();
        IndexedSeq indexedSeq3 = (IndexedSeq) splitAt._2();
        ((ConfluentAdmin) apply.apply((ClusterLinkTestHarness) create2.elem)).alterMirrors((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) indexedSeq2.map(str4 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str4), AlterMirrorOp.PROMOTE);
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava(), timeoutMs);
        ((ConfluentAdmin) apply.apply((ClusterLinkTestHarness) create2.elem)).alterMirrors((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) indexedSeq3.map(str5 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str5), AlterMirrorOp.FAILOVER);
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava(), timeoutMs);
        indexedSeq2.foreach(str6 -> {
            $anonfun$testMultipleMirrorStateTransitions$11(this, create2, str6);
            return BoxedUnit.UNIT;
        });
        indexedSeq3.foreach(str7 -> {
            $anonfun$testMultipleMirrorStateTransitions$12(this, create2, str7);
            return BoxedUnit.UNIT;
        });
        createConsumer2.close();
        indexedSeq2.foreach(str8 -> {
            $anonfun$testMultipleMirrorStateTransitions$13(this, create2, str8);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(((TraversableOnce) ((TraversableLike) indexedSeq2.flatMap(str9 -> {
            return this.partitions$2(str9);
        }, IndexedSeq$.MODULE$.canBuildFrom())).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(((ClusterLinkTestHarness) create2.elem).leaderLog(topicPartition).logEndOffset()));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()).$plus$plus(((TraversableOnce) ((TraversableLike) indexedSeq3.flatMap(str10 -> {
            return this.partitions$2(str10);
        }, IndexedSeq$.MODULE$.canBuildFrom())).map(topicPartition2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), BoxesRunTime.boxToLong(200L));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())), (scala.collection.mutable.Map) ((TraversableLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map) ((java.util.Map) ((Admin) apply.apply((ClusterLinkTestHarness) create2.elem)).listConsumerGroupOffsets("group").all().get(15L, TimeUnit.SECONDS)).get("group")).asScala()).map(tuple2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple2._1()), BoxesRunTime.boxToLong(((OffsetAndMetadata) tuple2._2()).offset()));
        }, scala.collection.mutable.Map$.MODULE$.canBuildFrom()));
    }

    private void verifyBidirectionalLink(ConnectionMode connectionMode, ConnectionMode connectionMode2, Option<Seq<Enumeration.Value>> option, Map<String, String> map) {
        Assumptions.assumeTrue(clusterLinkPrefix().isEmpty());
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str = "east.topic";
        String str2 = "west.topic";
        KafkaProducer<byte[], byte[]> createProducer = destCluster.createProducer(destCluster.createProducer$default$1(), destCluster.createProducer$default$2(), destCluster.createProducer$default$3());
        KafkaProducer<byte[], byte[]> createProducer2 = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        sourceCluster.createTopic("west.topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster.createTopic("east.topic", numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        Properties linkProps = linkProps(connectionMode, sourceCluster, new Some("west.group"), option, map);
        Properties linkProps2 = linkProps(connectionMode2, destCluster, new Some("east.group"), option, map);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps, linkProps2);
        if (!map.get(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).contains("true")) {
            destCluster.linkTopic("west.topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
            sourceCluster.linkTopic("east.topic", replicationFactor(), linkName(), sourceCluster.linkTopic$default$4(), sourceCluster.linkTopic$default$5());
        }
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$verifyBidirectionalLink$1(str2, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        IndexedSeq indexedSeq2 = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2 -> {
            return $anonfun$verifyBidirectionalLink$2(str, BoxesRunTime.unboxToInt(obj2));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        waitForMirroring(destCluster, indexedSeq);
        waitForMirroring(sourceCluster, indexedSeq2);
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6());
        waitForMirroring(destCluster, indexedSeq);
        waitForMirroring(sourceCluster, indexedSeq2);
        long nextOffset = nextOffset(new TopicPartition("east.topic", 0));
        commitOffsets(destCluster, "east.topic", 0, nextOffset, "east.group");
        commitOffsets(destCluster, "west.topic", 0, nextOffset, "east.group");
        commitOffsets(sourceCluster, "west.topic", 0, nextOffset, "west.group");
        commitOffsets(sourceCluster, "east.topic", 0, nextOffset, "west.group");
        verifyOffsetMigration("west.topic", 0, nextOffset, "west.group", destCluster);
        boolean z = option.isDefined() && !((SeqLike) option.get()).contains(TopicType$.MODULE$.REMOTE_MIRROR());
        if (z) {
            verifyOffsetMigration("east.topic", 0, 0L, "west.group", destCluster);
        } else {
            verifyOffsetMigration("east.topic", 0, nextOffset, "west.group", destCluster);
        }
        verifyOffsetMigration("east.topic", 0, nextOffset, "east.group", sourceCluster);
        if (z) {
            verifyOffsetMigration("west.topic", 0, 0L, "east.group", sourceCluster);
        } else {
            verifyOffsetMigration("west.topic", 0, nextOffset, "east.group", sourceCluster);
        }
        verifyLinkMetrics(createBidirectionalLink, destCluster, sourceCluster, linkProps, linkProps2, "east.topic", "west.topic");
        destCluster.unlinkTopic("west.topic", linkName(), destCluster.unlinkTopic$default$3(), destCluster.unlinkTopic$default$4(), destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        sourceCluster.unlinkTopic("east.topic", linkName(), sourceCluster.unlinkTopic$default$3(), sourceCluster.unlinkTopic$default$4(), sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        verifyMirror(destCluster, indexedSeq);
        verifyMirror(sourceCluster, indexedSeq2);
        destCluster.deleteClusterLink(linkName(), destCluster.deleteClusterLink$default$2(), destCluster.deleteClusterLink$default$3());
        sourceCluster.deleteClusterLink(linkName(), sourceCluster.deleteClusterLink$default$2(), sourceCluster.deleteClusterLink$default$3());
    }

    private Option<Seq<Enumeration.Value>> verifyBidirectionalLink$default$3() {
        return None$.MODULE$;
    }

    private Map<String, String> verifyBidirectionalLink$default$4() {
        return Map$.MODULE$.empty();
    }

    private void configureMirrorTransitionBatchSize(Seq<ClusterLinkTestHarness> seq, int i) {
        seq.foreach(clusterLinkTestHarness -> {
            $anonfun$configureMirrorTransitionBatchSize$1(i, clusterLinkTestHarness);
            return BoxedUnit.UNIT;
        });
    }

    public Properties linkProps(ConnectionMode connectionMode, ClusterLinkTestHarness clusterLinkTestHarness, Option<String> option, Option<Seq<Enumeration.Value>> option2, Map<String, String> map) {
        Properties properties = new Properties();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (connectionMode != null ? connectionMode.equals(connectionMode$Outbound$) : connectionMode$Outbound$ == null) {
            String createLinkCredentials = createLinkCredentials(linkName(), clusterLinkTestHarness, createLinkCredentials$default$3());
            properties.put("bootstrap.servers", clusterLinkTestHarness.bootstrapServers(clusterLinkTestHarness.bootstrapServers$default$1()));
            Implicits$.MODULE$.PropertiesOps(properties).$plus$plus$eq(clusterLinkTestHarness.clientSecurityProps(linkName()));
            properties.put("sasl.jaas.config", createLinkCredentials);
        }
        properties.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "10000");
        properties.put("metadata.max.age.ms", "2000");
        properties.put("reconnect.backoff.max.ms", "1000");
        properties.setProperty("request.timeout.ms", "1000");
        properties.setProperty("default.api.timeout.ms", "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        properties.setProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        option.foreach(str -> {
            String consumerGroupFilter;
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
            if (option2 instanceof Some) {
                consumerGroupFilter = this.consumerGroupFilter(str, (Seq) ((Some) option2).value());
            } else {
                if (!None$.MODULE$.equals(option2)) {
                    throw new MatchError(option2);
                }
                consumerGroupFilter = this.consumerGroupFilter(str);
            }
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter);
            return properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        });
        Implicits$.MODULE$.PropertiesOps(properties).$plus$plus$eq(map);
        return properties;
    }

    public Option<String> linkProps$default$3() {
        return None$.MODULE$;
    }

    public Option<Seq<Enumeration.Value>> linkProps$default$4() {
        return None$.MODULE$;
    }

    public Map<String, String> linkProps$default$5() {
        return Map$.MODULE$.empty();
    }

    private void verifyMirror(ClusterLinkTestHarness clusterLinkTestHarness, Seq<TopicPartition> seq) {
        Consumer<byte[], byte[]> createConsumer = clusterLinkTestHarness.createConsumer(clusterLinkTestHarness.createConsumer$default$1(), clusterLinkTestHarness.createConsumer$default$2(), clusterLinkTestHarness.createConsumer$default$3(), clusterLinkTestHarness.createConsumer$default$4());
        createConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(seq).asJava());
        consumePartitionRecords(createConsumer, seq.toSet(), clusterLinkPrefix(), ((TopicPartition) seq.head()).topic(), consumePartitionRecords$default$5());
        createConsumer.close();
    }

    private void verifyLinkMetrics(Uuid uuid, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2, String str, String str2) {
        new $colon.colon(clusterLinkTestHarness, new $colon.colon(clusterLinkTestHarness2, Nil$.MODULE$)).foreach(clusterLinkTestHarness3 -> {
            $anonfun$verifyLinkMetrics$1(this, clusterLinkTestHarness3);
            return BoxedUnit.UNIT;
        });
        destCluster_$eq(clusterLinkTestHarness);
        sourceCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str2);
        sourceCluster_$eq(clusterLinkTestHarness);
        destCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties2, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str);
        ConnectionMode fromString = ConnectionMode$.MODULE$.fromString(properties.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode fromString2 = ConnectionMode$.MODULE$.fromString(properties2.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (fromString != null ? !fromString.equals(connectionMode$Inbound$) : connectionMode$Inbound$ != null) {
            ConnectionMode$Inbound$ connectionMode$Inbound$2 = ConnectionMode$Inbound$.MODULE$;
            if (fromString2 == null) {
                if (connectionMode$Inbound$2 != null) {
                    return;
                }
            } else if (!fromString2.equals(connectionMode$Inbound$2)) {
                return;
            }
        }
        String linkName = linkName();
        ClusterLinkConfig.LinkMode linkMode = ClusterLinkConfig.LinkMode.BIDIRECTIONAL;
        ClusterLinkConfig.LinkMode linkMode2 = ClusterLinkConfig.LinkMode.BIDIRECTIONAL;
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        ClusterLinkTestHarness clusterLinkTestHarness4 = (fromString != null ? !fromString.equals(connectionMode$Outbound$) : connectionMode$Outbound$ != null) ? clusterLinkTestHarness2 : clusterLinkTestHarness;
        ConnectionMode$Inbound$ connectionMode$Inbound$3 = ConnectionMode$Inbound$.MODULE$;
        verifyReverseConnectionMetrics(linkName, linkMode, linkMode2, clusterLinkTestHarness4, (fromString != null ? !fromString.equals(connectionMode$Inbound$3) : connectionMode$Inbound$3 != null) ? clusterLinkTestHarness2 : clusterLinkTestHarness);
    }

    public Uuid createBidirectionalLink(String str, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2) {
        Uuid createClusterLink = clusterLinkTestHarness.createClusterLink(str, properties, new Some(((KafkaBroker) clusterLinkTestHarness2.brokers().head()).clusterId()), true);
        Assertions.assertEquals(createClusterLink, clusterLinkTestHarness2.createClusterLinkWithAllOptions(str, properties2, new Some(((KafkaBroker) clusterLinkTestHarness.brokers().head()).clusterId()), true, new Some(createClusterLink), clusterLinkTestHarness2.createClusterLinkWithAllOptions$default$6()));
        return createClusterLink;
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirror$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    private static final void bumpLeaderEpochs$1(ClusterLinkTestHarness clusterLinkTestHarness, IntRef intRef, IndexedSeq indexedSeq) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), intRef.elem).foreach$mVc$sp(i -> {
            indexedSeq.foreach(topicPartition -> {
                return BoxesRunTime.boxToInteger(clusterLinkTestHarness.changeLeader(topicPartition));
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$5(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition, IntRef intRef) {
        return clusterLinkTestHarness.replicaStatusWithPartitionResult(topicPartition.topic(), topicPartition.partition()).leaderEpoch().orElseGet(() -> {
            return 0;
        }) >= intRef.elem;
    }

    public static final /* synthetic */ String $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$7() {
        return "Leader epoch did not get bumped";
    }

    public static final /* synthetic */ void $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(ClusterLinkTestHarness clusterLinkTestHarness, IntRef intRef, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$5(clusterLinkTestHarness, topicPartition, intRef)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$7());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private static final void verifyLeaderEpochIsBumped$1(ClusterLinkTestHarness clusterLinkTestHarness, IndexedSeq indexedSeq, IntRef intRef) {
        indexedSeq.foreach(topicPartition -> {
            $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(clusterLinkTestHarness, intRef, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLag$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithDeletedTopics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testRollbackMirrorFromPendingSynchronizeState$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testRollbackMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testPauseMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ void $anonfun$testPauseMirrorFailsOnPendingMirror$2(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness) {
        MirrorTopicDescription describeMirrorTopic = clusterLinkTestHarness.describeMirrorTopic(bidirectionalLinkIntegrationTest.topic(), true);
        Assertions.assertNotNull(describeMirrorTopic);
        List mirrorStateTransitionErrors = describeMirrorTopic.mirrorStateTransitionErrors();
        Assertions.assertEquals(1, mirrorStateTransitionErrors.size());
        Assertions.assertEquals(ClusterLinkTaskError.ClusterLinkTaskErrorCode.INTERNAL_ERROR, ((ClusterLinkTaskError) mirrorStateTransitionErrors.get(0)).errorCode());
        MirrorTopicDescription describeMirrorTopic2 = clusterLinkTestHarness.describeMirrorTopic(bidirectionalLinkIntegrationTest.topic(), clusterLinkTestHarness.describeMirrorTopic$default$2());
        Assertions.assertNotNull(describeMirrorTopic2);
        describeMirrorTopic2.mirrorStateTransitionErrors();
        Assertions.assertEquals(0, describeMirrorTopic2.mirrorStateTransitionErrors().size());
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndPauseMirror$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMultipleMirrorStateTransitions$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Seq partitions$2(String str) {
        return (Seq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testMultipleMirrorStateTransitions$1(str, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$3(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, ObjectRef objectRef2, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) objectRef2.elem;
        clusterLinkTestHarness2.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness2.waitUntilMirrorDescriptionState$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) objectRef.elem;
        bidirectionalLinkIntegrationTest.produceRecords(clusterLinkTestHarness3.createProducer(clusterLinkTestHarness3.createProducer$default$1(), clusterLinkTestHarness3.createProducer$default$2(), clusterLinkTestHarness3.createProducer$default$3()), str, 20, bidirectionalLinkIntegrationTest.produceRecords$default$4(), bidirectionalLinkIntegrationTest.produceRecords$default$5(), bidirectionalLinkIntegrationTest.produceRecords$default$6());
        bidirectionalLinkIntegrationTest.waitForMirroring((ClusterLinkTestHarness) objectRef2.elem, bidirectionalLinkIntegrationTest.partitions$2(str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void reverseMirroring$1(Seq seq, Map map, ObjectRef objectRef, AlterMirrorsOptions alterMirrorsOptions, ObjectRef objectRef2) {
        ((ConfluentAdmin) map.apply((ClusterLinkTestHarness) objectRef.elem)).alterMirrors((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) seq.map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava(), alterMirrorsOptions);
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef2.elem;
        objectRef2.elem = (ClusterLinkTestHarness) objectRef.elem;
        objectRef.elem = clusterLinkTestHarness;
        seq.foreach(str2 -> {
            $anonfun$testMultipleMirrorStateTransitions$3(this, objectRef2, objectRef, str2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$testMultipleMirrorStateTransitions$4(int i) {
        return new StringBuilder(5).append("topic").append(i).toString();
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$5(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, ObjectRef objectRef2, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.createTopic(str, bidirectionalLinkIntegrationTest.numPartitions(), bidirectionalLinkIntegrationTest.replicationFactor(), clusterLinkTestHarness.createTopic$default$4(), clusterLinkTestHarness.createTopic$default$5(), clusterLinkTestHarness.createTopic$default$6());
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) objectRef.elem;
        bidirectionalLinkIntegrationTest.produceRecords(clusterLinkTestHarness2.createProducer(clusterLinkTestHarness2.createProducer$default$1(), clusterLinkTestHarness2.createProducer$default$2(), clusterLinkTestHarness2.createProducer$default$3()), str, 20, bidirectionalLinkIntegrationTest.produceRecords$default$4(), bidirectionalLinkIntegrationTest.produceRecords$default$5(), bidirectionalLinkIntegrationTest.produceRecords$default$6());
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) objectRef2.elem;
        clusterLinkTestHarness3.linkTopic(str, bidirectionalLinkIntegrationTest.replicationFactor(), bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness3.linkTopic$default$4(), clusterLinkTestHarness3.linkTopic$default$5());
        bidirectionalLinkIntegrationTest.waitForMirroring((ClusterLinkTestHarness) objectRef2.elem, bidirectionalLinkIntegrationTest.partitions$2(str));
    }

    private final void createConsumerAndCommitOffsets$1(Consumer consumer, IndexedSeq indexedSeq) {
        consumer.subscribe((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(indexedSeq).asJava());
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 100, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
        consumer.commitSync((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) ((TraversableLike) indexedSeq.flatMap(str -> {
            return this.partitions$2(str);
        }, IndexedSeq$.MODULE$.canBuildFrom())).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetAndMetadata(200L));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$11(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$12(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$13(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$2(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ void $anonfun$configureMirrorTransitionBatchSize$1(int i, ClusterLinkTestHarness clusterLinkTestHarness) {
        clusterLinkTestHarness.alterBrokerConfig(None$.MODULE$, "confluent.cluster.link.mirror.transition.batch.size", Integer.toString(i));
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$3(MetricName metricName) {
        return metricName.group().contains("cluster-link") && metricName.tags().containsKey("mode");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$4(MetricName metricName) {
        String name = metricName.name();
        return name == null || !name.equals("active-link-count");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$5(MetricName metricName) {
        Object obj = metricName.tags().get("mode");
        return (obj == null || !obj.equals("bidirectional")) && metricName.tags().containsKey("link-id") && !metricName.name().startsWith("reverse-connection");
    }

    public static final /* synthetic */ void $anonfun$verifyLinkMetrics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness) {
        bidirectionalLinkIntegrationTest.verifyLinkCountMetric(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, "active", clusterLinkTestHarness);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.BIDIRECTIONAL);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.SOURCE);
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), ((TraversableOnce) clusterLinkTestHarness.aliveServers().flatMap(kafkaBroker -> {
            return ((TraversableOnce) ((SetLike) ((TraversableLike) ((TraversableLike) ((TraversableLike) CollectionConverters$.MODULE$.asScalaSetConverter(kafkaBroker.metrics().metrics().keySet()).asScala()).filter(metricName -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$3(metricName));
            })).filter(metricName2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$4(metricName2));
            })).filter(metricName3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$5(metricName3));
            })).map(metricName4 -> {
                return new StringBuilder(1).append(metricName4.name()).append(":").append(metricName4.tags()).toString();
            }, Set$.MODULE$.canBuildFrom())).toSet();
        }, Seq$.MODULE$.canBuildFrom())).toSet());
    }

    public BidirectionalLinkIntegrationTest() {
        useSourceInitiatedLink_$eq(false);
        sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, new Some(SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, new Some(SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
    }
}
