package kafka.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConvertToMirrorTopicTaskType$;
import kafka.server.link.ClusterLinkMetrics$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.InternalTaskErrorCode$;
import kafka.server.link.NoErrorCode$;
import kafka.server.link.TopicType$;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.AlterMirrorsResult;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.TruncateAndRestoreResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.$less$colon$less$;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IndexedSeqOps;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.SeqOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: BidirectionalLinkIntegrationTest.scala */
@Tags({@Tag("integration"), @Tag("bazel:shard_count:10"), @Tag("bazel:size:large")})
@ScalaSignature(bytes = "\u0006\u0005\u0011Eg\u0001B\u001c9\u0001uBQA\u0011\u0001\u0005\u0002\rCQ!\u0012\u0001\u0005B\u0019CQ!\u0014\u0001\u0005\u00029CQA \u0001\u0005\u0002}Dq!!\u0004\u0001\t\u0003\ty\u0001C\u0004\u0002\u001e\u0001!\t!a\b\t\u000f\u00055\u0002\u0001\"\u0001\u00020!9\u00111\t\u0001\u0005\u0002\u0005\u0015\u0003bBA*\u0001\u0011\u0005\u0011Q\u000b\u0005\b\u0003G\u0002A\u0011AA3\u0011\u001d\t\u0019\b\u0001C\u0001\u0003kBq!a!\u0001\t\u0003\t)\tC\u0004\u0002\u0014\u0002!\t!!&\t\u000f\u0005\r\u0006\u0001\"\u0001\u0002&\"9\u00111\u0017\u0001\u0005\u0002\u0005U\u0006bBAb\u0001\u0011\u0005\u0011Q\u0019\u0005\b\u0003'\u0004A\u0011AAk\u0011\u001d\t\u0019\u000f\u0001C\u0001\u0003KDq!a=\u0001\t\u0003\t)\u0010C\u0004\u0003\u0004\u0001!\tA!\u0002\t\u000f\tM\u0001\u0001\"\u0001\u0003\u0016!9!1\u0005\u0001\u0005\u0002\t\u0015\u0002b\u0002B\u001a\u0001\u0011\u0005!Q\u0007\u0005\b\u0005\u0007\u0002A\u0011\u0001B#\u0011\u001d\u0011\u0019\u0006\u0001C\u0001\u0005+BqAa\u0019\u0001\t\u0013\u0011)\u0007C\u0004\u0003��\u0001!\tA!!\t\u000f\t=\u0005\u0001\"\u0001\u0003\u0012\"9!q\u0014\u0001\u0005\u0002\t\u0005\u0006b\u0002BX\u0001\u0011\u0005!\u0011\u0017\u0005\b\u0005\u007f\u0003A\u0011\u0001Ba\u0011\u001d\u0011y\r\u0001C\u0001\u0005#DqAa8\u0001\t\u0003\u0011\t\u000fC\u0004\u0003p\u0002!\tA!=\t\u000f\t}\b\u0001\"\u0001\u0004\u0002!91q\u0002\u0001\u0005\u0002\rE\u0001bBB\u0010\u0001\u0011\u00051\u0011\u0005\u0005\b\u0007_\u0001A\u0011AB\u0019\u0011\u001d\u0019y\u0004\u0001C\u0001\u0007\u0003Bqaa\u0014\u0001\t\u0003\u0019\t\u0006C\u0004\u0004`\u0001!\ta!\u0019\t\u000f\r=\u0004\u0001\"\u0003\u0004r!I1q\u001a\u0001\u0012\u0002\u0013%1\u0011\u001b\u0005\n\u0007O\u0004\u0011\u0013!C\u0005\u0007SDqa!<\u0001\t\u0013\u0019y\u000fC\u0004\u0004|\u0002!\ta!@\t\u0013\u0011\u0005\u0002!%A\u0005\u0002\u0011\r\u0002\"\u0003C\u0014\u0001E\u0005I\u0011ABi\u0011%!I\u0003AI\u0001\n\u0003\u0019I\u000fC\u0004\u0005,\u0001!I\u0001\"\f\t\u000f\u0011%\u0003\u0001\"\u0003\u0005L!9Aq\u000e\u0001\u0005\u0012\u0011E\u0004b\u0002C@\u0001\u0011%A\u0011\u0011\u0005\b\tK\u0003A\u0011\u0002CT\u0005\u0001\u0012\u0015\u000eZ5sK\u000e$\u0018n\u001c8bY2Kgn[%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005eR\u0014\u0001\u00027j].T\u0011aO\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001a\b\u0005\u0002@\u00016\t\u0001(\u0003\u0002Bq\t\u0011\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW&sG/Z4sCRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001#\u0011\u0005}\u0002\u0011!G7bs\n,Wk]3CS\u0012L'/Z2uS>t\u0017\r\u001c'j].$\u0012a\u0012\t\u0003\u0011.k\u0011!\u0013\u0006\u0002\u0015\u0006)1oY1mC&\u0011A*\u0013\u0002\u0005+:LG/\u0001\u0017uKN$()\u001b3je\u0016\u001cG/[8oC2d\u0015N\\6XSRDw*\u001e;c_VtGmQ8o]\u0016\u001cG/[8ogR!qi\u0014/b\u0011\u0015\u00016\u00011\u0001R\u0003\u0019\tXo\u001c:v[B\u0011!+\u0017\b\u0003'^\u0003\"\u0001V%\u000e\u0003US!A\u0016\u001f\u0002\rq\u0012xn\u001c;?\u0013\tA\u0016*\u0001\u0004Qe\u0016$WMZ\u0005\u00035n\u0013aa\u0015;sS:<'B\u0001-J\u0011\u0015i6\u00011\u0001_\u0003-\u0019wn\u001c:eS:\fGo\u001c:\u0011\u0005!{\u0016B\u00011J\u0005\u001d\u0011un\u001c7fC:DQAY\u0002A\u0002E\u000b\u0001\u0003\\8dC2\u0014V\r\u001d7jG\u0006$\u0018n\u001c8)\t\r!\u0007/\u001d\t\u0003K:l\u0011A\u001a\u0006\u0003O\"\fa\u0001]1sC6\u001c(BA5k\u0003\u001dQW\u000f]5uKJT!a\u001b7\u0002\u000b),h.\u001b;\u000b\u00035\f1a\u001c:h\u0013\tygMA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\n!/A\u001f|I&\u001c\b\u000f\\1z\u001d\u0006lW- \u0018rk>\u0014X/\\\u001f|aut3m\\8sI&t\u0017\r^8s{m\fTP\f7pG\u0006d'+\u001a9mS\u000e\fG/[8o{m\u0014T\u0010\u000b\u0003\u0004ij\\\bCA;y\u001b\u00051(BA<g\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018BA=w\u00051iU\r\u001e5pIN{WO]2f\u0003\u00151\u0018\r\\;fY\u0005a\u0018%A?\u0002QE,xN];n\u0007>|'\u000fZ5oCR|'OU3qY&\u001c\u0017\r^5p]\u000e{WNY5oCRLwN\\:\u0002_Q,7\u000f\u001e\"jI&\u0014Xm\u0019;j_:\fG\u000eT5oW^KG\u000f[(oK\u000e{gN\\3di&|g.\u00138ji&\fGo\u001c:\u0015\u000f\u001d\u000b\t!a\u0001\u0002\u0006!)\u0001\u000b\u0002a\u0001#\")Q\f\u0002a\u0001=\")!\r\u0002a\u0001#\"\"A\u0001\u001a9rQ\u0015!AO_A\u0006Y\u0005a\u0018A\n;fgR\u0014\u0015\u000eZ5sK\u000e$\u0018n\u001c8bY2Kgn[,ji\"\fU\u000f^8NSJ\u0014xN]5oOR9q)!\u0005\u0002\u0014\u0005U\u0001\"\u0002)\u0006\u0001\u0004\t\u0006\"B/\u0006\u0001\u0004q\u0006\"\u00022\u0006\u0001\u0004\t\u0006\u0006B\u0003eaFDS!\u0002;{\u00037a\u0013\u0001`\u00013i\u0016\u001cHOQ5eSJ,7\r^5p]\u0006dG*\u001b8l/&$\bn\\;u\u0013:\u001cG.\u001e3j]\u001e\u0014V-\\8uK6K'O]8sgR9q)!\t\u0002$\u0005\u0015\u0002\"\u0002)\u0007\u0001\u0004\t\u0006\"B/\u0007\u0001\u0004q\u0006\"\u00022\u0007\u0001\u0004\t\u0006\u0006\u0002\u0004eaFDSA\u0002;{\u0003Wa\u0013\u0001`\u00011i\u0016\u001cH/T5se>\u0014Hk\u001c9jG\u000e\u0013X-\u0019;j_:<\u0016\u000e\u001e5Ti>\u0004\b/\u001a3TKF,XM\\2f\u001dVl'-\u001a:\u0015\u000b\u001d\u000b\t$a\r\t\u000bA;\u0001\u0019A)\t\u000bu;\u0001\u0019\u00010)\u000b\u001d!\u0007/a\u000e\"\u0005\u0005e\u0012\u0001K>eSN\u0004H.Y=OC6,WPL9v_J,X.P>1{:\u001awn\u001c:eS:\fGo\u001c:>wFj\b&B\u0004uu\u0006uBFAA C\t\t\t%A\tle\u00064GoQ8nE&t\u0017\r^5p]N\f\u0011\u0004^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:peR9q)a\u0012\u0002J\u0005-\u0003\"\u0002)\t\u0001\u0004\t\u0006\"B/\t\u0001\u0004q\u0006\"\u00022\t\u0001\u0004\t\u0006\u0006\u0002\u0005eaFDS\u0001\u0003;{\u0003#b\u0013\u0001`\u00010i\u0016\u001cHOU3wKJ\u001cX-\u00118e'R\f'\u000f^'jeJ|'oV5uQ2+\u0017\rZ3s\u000bB|7\r[\"iC:<Wm\u001d\u000b\b\u000f\u0006]\u0013\u0011LA.\u0011\u0015\u0001\u0016\u00021\u0001R\u0011\u0015i\u0016\u00021\u0001_\u0011\u0015\u0011\u0017\u00021\u0001RQ\u0011IA\r]9)\u000b%!(0!\u0019-\u0003q\f\u0001\u0005^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe^KG\u000f\u001b'bOR9q)a\u001a\u0002j\u0005-\u0004\"\u0002)\u000b\u0001\u0004\t\u0006\"B/\u000b\u0001\u0004q\u0006\"\u00022\u000b\u0001\u0004\t\u0006\u0006\u0002\u0006eaFDSA\u0003;{\u0003cb\u0013\u0001`\u00015i\u0016\u001cHOU3wKJ\u001cX-\u00118e'R\f'\u000f^'jeJ|'/\u001b8h/&$\bNU3n_R,7\t\\;ti\u0016\u0014(+Z:uCJ$HcB$\u0002x\u0005e\u00141\u0010\u0005\u0006!.\u0001\r!\u0015\u0005\u0006;.\u0001\rA\u0018\u0005\u0006E.\u0001\r!\u0015\u0015\u0005\u0017\u0011\u0004\u0018\u000fK\u0003\fij\f\t\tL\u0001}\u0003M\"Xm\u001d;SKZ,'o]3B]\u0012\u001cF/\u0019:u\u001b&\u0014(o\u001c:j]\u001e<\u0016\u000e\u001e5M_\u000e\fGn\u00117vgR,'OU3ti\u0006\u0014H\u000fF\u0004H\u0003\u000f\u000bI)a#\t\u000bAc\u0001\u0019A)\t\u000buc\u0001\u0019\u00010\t\u000b\td\u0001\u0019A))\t1!\u0007/\u001d\u0015\u0006\u0019QT\u0018\u0011\u0013\u0017\u0002y\u0006!D/Z:u%\u00164XM]:f\u0003:$7\u000b^1si6K'O]8sS:<w+\u001b;i!\u0006,8/Z!oIVs\u0007/Y;tK2Kgn[:\u0015\u000f\u001d\u000b9*!'\u0002\u001c\")\u0001+\u0004a\u0001#\")Q,\u0004a\u0001=\")!-\u0004a\u0001#\"\"Q\u0002\u001a9rQ\u0015iAO_AQY\u0005a\u0018a\u0010;fgR\u0014VM^3sg\u0016\fe\u000eZ*uCJ$X*\u001b:s_JLgnZ,ji\"\u0004\u0016-^:f\u0003:$WK\u001c9bkN,Gj\\2bY6K'O]8s)>\u0004\u0018n\u0019\u000b\b\u000f\u0006\u001d\u0016\u0011VAV\u0011\u0015\u0001f\u00021\u0001R\u0011\u0015if\u00021\u0001_\u0011\u0015\u0011g\u00021\u0001RQ\u0011qA\r]9)\u000b9!(0!--\u0003q\fa\u000b^3tiJ+g/\u001a:tK\u0006sGm\u0015;beRl\u0015N\u001d:pe&twmV5uQB\u000bWo]3B]\u0012,f\u000e]1vg\u0016dunY1m\u001b&\u0014(o\u001c:U_BL7mV5uQ2{7-\u00197DYV\u001cH/\u001a:SKN$\u0018M\u001d;\u0015\u000f\u001d\u000b9,!/\u0002<\")\u0001k\u0004a\u0001#\")Ql\u0004a\u0001=\")!m\u0004a\u0001#\"\"q\u0002\u001a9rQ\u0015yAO_AaY\u0005a\u0018A\u000e;fgR\u0014VM^3sg\u0016\fe\u000eZ*uCJ$X*\u001b:s_J<\u0016\u000e\u001e5EK2,G/\u001a3B]\u0012\u0014Vm\u0019:fCR,G\rV8qS\u000e\u001cHcB$\u0002H\u0006%\u00171\u001a\u0005\u0006!B\u0001\r!\u0015\u0005\u0006;B\u0001\rA\u0018\u0005\u0006EB\u0001\r!\u0015\u0015\u0005!\u0011\u0004\u0018\u000fK\u0003\u0011ij\f\t\u000eL\u0001}\u0003)\"Xm\u001d;SKZ,'o]3B]\u0012\u001cF/\u0019:u\u001b&\u0014(o\u001c:XSRDG)\u001a7fi\u0016$Gk\u001c9jGN$raRAl\u00033\fY\u000eC\u0003Q#\u0001\u0007\u0011\u000bC\u0003^#\u0001\u0007a\fC\u0003c#\u0001\u0007\u0011\u000b\u000b\u0003\u0012IB\f\b&B\tuu\u0006\u0005HFAA \u0003\u0019\"Xm\u001d;S_2d'-Y2l\u001b&\u0014(o\u001c:GC&d7o\u00148QK:$\u0017N\\4NSJ\u0014xN\u001d\u000b\b\u000f\u0006\u001d\u0018\u0011^Av\u0011\u0015\u0001&\u00031\u0001R\u0011\u0015i&\u00031\u0001_\u0011\u0015\u0011'\u00031\u0001RQ\u0011\u0011B\r]9)\u000bI!(0!=-\u0003q\f1\u0005^3tiB\u000bWo]3NSJ\u0014xN\u001d$bS2\u001cxJ\u001c)f]\u0012LgnZ'jeJ|'\u000fF\u0004H\u0003o\fI0a?\t\u000bA\u001b\u0002\u0019A)\t\u000bu\u001b\u0002\u0019\u00010\t\u000b\t\u001c\u0002\u0019A))\tM!\u0007/\u001d\u0015\u0006'QT(\u0011\u0001\u0017\u0002y\u0006IB/Z:u%\u00164XM]:f\u0003:$\u0007+Y;tK6K'O]8s)\u001d9%q\u0001B\u0005\u0005\u0017AQ\u0001\u0015\u000bA\u0002ECQ!\u0018\u000bA\u0002yCQA\u0019\u000bA\u0002ECC\u0001\u00063qc\"*A\u0003\u001e>\u0003\u00121\nA0\u0001\u0016uKN$HK];oG\u0006$X-\u00118e%\u0016\u001cHo\u001c:f%\u0016lw\u000e^3DYV\u001cH/\u001a:SKN$\u0018M\u001d;\u0015\u000f\u001d\u00139B!\u0007\u0003\u001c!)\u0001+\u0006a\u0001#\")Q,\u0006a\u0001=\")!-\u0006a\u0001#\"\"Q\u0003\u001a9rQ\u0015)BO\u001fB\u0011Y\t\ty$A\u0015uKN$HK];oG\u0006$X-\u00118e%\u0016\u001cHo\u001c:f\u0019>\u001c\u0017\r\\\"mkN$XM\u001d*fgR\f'\u000f\u001e\u000b\b\u000f\n\u001d\"\u0011\u0006B\u0016\u0011\u0015\u0001f\u00031\u0001R\u0011\u0015if\u00031\u0001_\u0011\u0015\u0011g\u00031\u0001RQ\u00111B\r]9)\u000bY!(P!\r-\u0005\u0005}\u0012a\r;fgR$&/\u001e8dCR,\u0017I\u001c3SKN$xN]3XSRDG)\u001a7fi\u0016$\u0017I\u001c3SK\u000e\u0014X-\u0019;fIR{\u0007/[2t)\u001d9%q\u0007B\u001d\u0005wAQ\u0001U\fA\u0002ECQ!X\fA\u0002yCQAY\fA\u0002ECCa\u00063qc\"*q\u0003\u001e>\u0003B1\u0012\u0011qH\u00018i\u0016\u001cH\u000f\u0016:v]\u000e\fG/Z!oIJ+7\u000f^8sK6K'O]8sS:<w+\u001b;i!\u0006,8/Z!oIVs\u0007/Y;tK2Kgn[:\u0015\u000f\u001d\u00139E!\u0013\u0003L!)\u0001\u000b\u0007a\u0001#\")Q\f\u0007a\u0001=\")!\r\u0007a\u0001#\"\"\u0001\u0004\u001a9rQ\u0015ABO\u001fB)Y\t\ty$\u0001\u001auKN$HK];oG\u0006$X-\u00118e%\u0016\u001cHo\u001c:f\u001b&\u0014(o\u001c:XSRDG*Z1eKJ,\u0005o\\2i\u0007\"\fgnZ3t)\u001d9%q\u000bB-\u00057BQ\u0001U\rA\u0002ECQ!X\rA\u0002yCQAY\rA\u0002ECC!\u00073qc\"*\u0011\u0004\u001e>\u0003b1\u0012\u0011qH\u0001\u0018aJ|G-^2f%\u0016\u001cwN\u001d3t)>\u001cE.^:uKJ$ra\u0012B4\u0005c\u0012)\bC\u0004\u0003ji\u0001\rAa\u001b\u0002\u000f\rdWo\u001d;feB\u0019qH!\u001c\n\u0007\t=\u0004H\u0001\fDYV\u001cH/\u001a:MS:\\G+Z:u\u0011\u0006\u0014h.Z:t\u0011\u0019\u0011\u0019H\u0007a\u0001#\u0006)Ao\u001c9jG\"9!q\u000f\u000eA\u0002\te\u0014A\u00038v[J+7m\u001c:egB\u0019\u0001Ja\u001f\n\u0007\tu\u0014JA\u0002J]R\f!\u0005^3ti6+H\u000e^5qY\u0016l\u0015N\u001d:peN#\u0018\r^3Ue\u0006t7/\u001b;j_:\u001cHcB$\u0003\u0004\n\u0015%q\u0011\u0005\u0006!n\u0001\r!\u0015\u0005\u0006;n\u0001\rA\u0018\u0005\u0006En\u0001\r!\u0015\u0015\u00067\u0011\u0004\u0018q\u0007\u0015\u00067QT(Q\u0012\u0017\u0002y\u00069C/Z:u)J,hnY1uK\u0006sGMU3ti>\u0014XmV5uQ\u0012+G.\u001a;fIR{\u0007/[2t)\u001d9%1\u0013BK\u0005/CQ\u0001\u0015\u000fA\u0002ECQ!\u0018\u000fA\u0002yCQA\u0019\u000fA\u0002ECC\u0001\b3qc\"*A\u0004\u001e>\u0003\u001e2\u0012\u0011qH\u0001\u0017i\u0016\u001cH\u000f\u0016:v]\u000e\fG/Z!oIJ+7\u000f^8sKR9qIa)\u0003&\n\u001d\u0006\"\u0002)\u001e\u0001\u0004\t\u0006\"B/\u001e\u0001\u0004q\u0006\"\u00022\u001e\u0001\u0004\t\u0006\u0006B\u000feaFDS!\b;{\u0005[c#!a\u0010\u0002qQ,7\u000f\u001e+sk:\u001c\u0017\r^3B]\u0012\u0014Vm\u001d;pe\u0016<\u0016\u000e\u001e5OK^dU-\u00193fe\u0016\u0003xn\u00195t\u001f:\u0014V-\\8uK\u000ecWo\u001d;feR9qIa-\u00036\n]\u0006\"\u0002)\u001f\u0001\u0004\t\u0006\"B/\u001f\u0001\u0004q\u0006\"\u00022\u001f\u0001\u0004\t\u0006\u0006\u0002\u0010eaFDSA\b;{\u0005{c#!a\u0010\u0002oQ,7\u000f\u001e+sk:\u001c\u0017\r^3B]\u0012\u0014Vm\u001d;pe\u0016<\u0016\u000e\u001e5OK^dU-\u00193fe\u0016\u0003xn\u00195t\u001f:dunY1m\u00072,8\u000f^3s)\u001d9%1\u0019Bc\u0005\u000fDQ\u0001U\u0010A\u0002ECQ!X\u0010A\u0002yCQAY\u0010A\u0002ECCa\b3qc\"*q\u0004\u001e>\u0003N2\u0012\u0011qH\u00018i\u0016\u001cH\u000f\u0016:v]\u000e\fG/Z!oIJ+7\u000f^8sK^KG\u000f\u001b#jm\u0016\u0014x-\u001a8u%\u0016\u001cwN\u001d3t\u0003\u001a$XM\u001d$bS2|g/\u001a:\u0015\u000f\u001d\u0013\u0019N!6\u0003X\")\u0001\u000b\ta\u0001#\")Q\f\ta\u0001=\")!\r\ta\u0001#\"\"\u0001\u0005\u001a9rQ\u0015\u0001CO\u001fBoY\t\ty$\u0001\u0018uKN$HK];oG\u0006$X-\u00118e%\u0016\u001cHo\u001c:f/&$\b\u000e\u0015:pIV\u001cW-\u00114uKJ4\u0015-\u001b7pm\u0016\u0014HcB$\u0003d\n\u0015(q\u001d\u0005\u0006!\u0006\u0002\r!\u0015\u0005\u0006;\u0006\u0002\rA\u0018\u0005\u0006E\u0006\u0002\r!\u0015\u0015\u0005C\u0011\u0004\u0018\u000fK\u0003\"ij\u0014i\u000f\f\u0002\u0002@\u0005\tE/Z:u)J,hnY1uK\u0006sGMU3ti>\u0014XmV5uQ\u0012Kg/\u001a:hK:$(+Z2pe\u0012\u001c\u0018I\u001c3Qe>$WoY3BMR,'OR1jY>4XM\u001d\u000b\b\u000f\nM(Q\u001fB|\u0011\u0015\u0001&\u00051\u0001R\u0011\u0015i&\u00051\u0001_\u0011\u0015\u0011'\u00051\u0001RQ\u0011\u0011C\r]9)\u000b\t\"(P!@-\u0005\u0005}\u0012A\t;fgR4\u0015-\u001b7pm\u0016\u0014H\u000b[3o%\u0016\u001cHo\u001c:f)\",gNU3wKJ\u001cX\rF\u0004H\u0007\u0007\u0019)aa\u0002\t\u000bA\u001b\u0003\u0019A)\t\u000bu\u001b\u0003\u0019\u00010\t\u000b\t\u001c\u0003\u0019A))\t\r\"\u0007/\u001d\u0015\u0006GQT8Q\u0002\u0017\u0003\u0003\u007f\t!\u0005^3tiJ+g/\u001a:tKRCWM\u001c$bS2|g/\u001a:UQ\u0016t'+Z:u_J,GcB$\u0004\u0014\rU1q\u0003\u0005\u0006!\u0012\u0002\r!\u0015\u0005\u0006;\u0012\u0002\rA\u0018\u0005\u0006E\u0012\u0002\r!\u0015\u0015\u0005I\u0011\u0004\u0018\u000fK\u0003%ij\u001ci\u0002\f\u0002\u0002@\u00059C/Z:u\r\u0006LGn\u001c<feRCWM\u001c*fgR|'/\u001a+iK:\u0014VM^3sg\u0016$v/[2f)\u001d951EB\u0013\u0007OAQ\u0001U\u0013A\u0002ECQ!X\u0013A\u0002yCQAY\u0013A\u0002ECC!\n3qc\"*Q\u0005\u001e>\u0004.1\u0012\u0011qH\u0001(i\u0016\u001cHOU3wKJ\u001cX\rV<jG\u0016$\u0006.\u001a8GC&dwN^3s)\",gNU3ti>\u0014X\rF\u0004H\u0007g\u0019)da\u000e\t\u000bA3\u0003\u0019A)\t\u000bu3\u0003\u0019\u00010\t\u000b\t4\u0003\u0019A))\t\u0019\"\u0007/\u001d\u0015\u0006MQT8Q\b\u0017\u0003\u0003\u007f\tA\u0007^3tiR\u0013XO\\2bi\u0016\fe\u000e\u001a*fgR|'/\u001a$bS2\u001cxJ\\,s_:<7\u000b^8qa\u0016$W*\u001b:s_J$v\u000e]5d)\u001d951IB#\u0007\u000fBQ\u0001U\u0014A\u0002ECQ!X\u0014A\u0002yCQAY\u0014A\u0002ECCa\n3qc\"*q\u0005\u001e>\u0004N1\u0012\u0011qH\u00010i\u0016\u001cH\u000f\u0016:v]\u000e\fG/Z!oIJ+7\u000f^8sK\u000e{WNY5oCRLwN\\(g\u001b&\u0014(o\u001c:Ti\u0006$Xm\u001d\u000b\b\u000f\u000eM3QKB,\u0011\u0015\u0001\u0006\u00061\u0001R\u0011\u0015i\u0006\u00061\u0001_\u0011\u0015\u0011\u0007\u00061\u0001RQ\u0011AC\r]9)\u000b!\"(p!\u0018-\u0005\u0005}\u0012A\n;fgR$&/\u001e8dCR,\u0017I\u001c3SKN$xN]3O_J+7m\u001c:eg&sGk\u001c9jGR9qia\u0019\u0004f\r\u001d\u0004\"\u0002)*\u0001\u0004\t\u0006\"B/*\u0001\u0004q\u0006\"\u00022*\u0001\u0004\t\u0006\u0006B\u0015eaFDS!\u000b;{\u0007[b#!a\u0010\u0002/Y,'/\u001b4z\u0005&$\u0017N]3di&|g.\u00197MS:\\G#C$\u0004t\r\u00155\u0011RB`\u0011\u001d\u0019)H\u000ba\u0001\u0007o\na#Z1ti2Kgn[\"p]:,7\r^5p]6{G-\u001a\t\u0005\u0007s\u001a\t)\u0004\u0002\u0004|)\u0019\u0011h! \u000b\u0007\r}$(\u0001\u0004tKJ4XM]\u0005\u0005\u0007\u0007\u001bYH\u0001\bD_:tWm\u0019;j_:lu\u000eZ3\t\u000f\r\u001d%\u00061\u0001\u0004x\u00051r/Z:u\u0019&t7nQ8o]\u0016\u001cG/[8o\u001b>$W\rC\u0005\u0004\f*\u0002\n\u00111\u0001\u0004\u000e\u0006QAo\u001c9jGRK\b/Z:\u0011\u000b!\u001byia%\n\u0007\rE\u0015J\u0001\u0004PaRLwN\u001c\t\u0007\u0007+\u001byj!*\u000f\t\r]51\u0014\b\u0004)\u000ee\u0015\"\u0001&\n\u0007\ru\u0015*A\u0004qC\u000e\\\u0017mZ3\n\t\r\u000561\u0015\u0002\u0004'\u0016\f(bABO\u0013B!1qUB]\u001d\u0011\u0019Ik!.\u000f\t\r-61\u0017\b\u0005\u0007[\u001b\tLD\u0002U\u0007_K\u0011aO\u0005\u0004\u0007\u007fR\u0014bA\u001d\u0004~%!1qWB>\u0003%!v\u000e]5d)f\u0004X-\u0003\u0003\u0004<\u000eu&!\u0003+pa&\u001cG+\u001f9f\u0015\u0011\u00199la\u001f\t\u0013\r\u0005'\u0006%AA\u0002\r\r\u0017aD2p]\u001aLwm\u0014<feJLG-Z:\u0011\r\r\u001571Z)R\u001b\t\u00199MC\u0002\u0004J&\u000b!bY8mY\u0016\u001cG/[8o\u0013\u0011\u0019ima2\u0003\u00075\u000b\u0007/A\u0011wKJLg-\u001f\"jI&\u0014Xm\u0019;j_:\fG\u000eT5oW\u0012\"WMZ1vYR$3'\u0006\u0002\u0004T*\"1QRBkW\t\u00199\u000e\u0005\u0003\u0004Z\u000e\rXBABn\u0015\u0011\u0019ina8\u0002\u0013Ut7\r[3dW\u0016$'bABq\u0013\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\r\u001581\u001c\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017!\t<fe&4\u0017PQ5eSJ,7\r^5p]\u0006dG*\u001b8lI\u0011,g-Y;mi\u0012\"TCABvU\u0011\u0019\u0019m!6\u0002E\r|gNZ5hkJ,W*\u001b:s_J$&/\u00198tSRLwN\u001c\"bi\u000eD7+\u001b>f)\u001595\u0011_B|\u0011\u001d\u0019\u00190\fa\u0001\u0007k\f\u0001b\u00197vgR,'o\u001d\t\u0007\u0007+\u001byJa\u001b\t\u000f\reX\u00061\u0001\u0003z\u0005I!-\u0019;dQNK'0Z\u0001\nY&t7\u000e\u0015:paN$Bba@\u0005\u0010\u0011MAq\u0003C\u000f\t?\u0001B\u0001\"\u0001\u0005\f5\u0011A1\u0001\u0006\u0005\t\u000b!9!\u0001\u0003vi&d'B\u0001C\u0005\u0003\u0011Q\u0017M^1\n\t\u00115A1\u0001\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bb\u0002C\t]\u0001\u00071qO\u0001\u000fG>tg.Z2uS>tWj\u001c3f\u0011\u001d!)B\fa\u0001\u0005W\nQB]3n_R,7\t\\;ti\u0016\u0014\b\"\u0003C\r]A\u0005\t\u0019\u0001C\u000e\u00035\u0019wN\\:v[\u0016\u0014xI]8vaB!\u0001ja$R\u0011%\u0019YI\fI\u0001\u0002\u0004\u0019i\tC\u0005\u0004B:\u0002\n\u00111\u0001\u0004D\u0006\u0019B.\u001b8l!J|\u0007o\u001d\u0013eK\u001a\fW\u000f\u001c;%gU\u0011AQ\u0005\u0016\u0005\t7\u0019).A\nmS:\\\u0007K]8qg\u0012\"WMZ1vYR$C'A\nmS:\\\u0007K]8qg\u0012\"WMZ1vYR$S'\u0001\u0007wKJLg-_'jeJ|'\u000fF\u0003H\t_!\t\u0004C\u0004\u0003jI\u0002\rAa\u001b\t\u000f\u0011M\"\u00071\u0001\u00056\u0005Q\u0001/\u0019:uSRLwN\\:\u0011\r\rU5q\u0014C\u001c!\u0011!I\u0004\"\u0012\u000e\u0005\u0011m\"\u0002\u0002C\u001f\t\u007f\taaY8n[>t'bA\u001e\u0005B)\u0019A1\t7\u0002\r\u0005\u0004\u0018m\u00195f\u0013\u0011!9\u0005b\u000f\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u0006\tb/\u001a:jMfd\u0015N\\6NKR\u0014\u0018nY:\u0015\u001f\u001d#i\u0005b\u0016\u0005\\\u0011}C1\rC4\tWBq\u0001b\u00144\u0001\u0004!\t&\u0001\u0004mS:\\\u0017\n\u001a\t\u0005\ts!\u0019&\u0003\u0003\u0005V\u0011m\"\u0001B+vS\u0012Dq\u0001\"\u00174\u0001\u0004\u0011Y'A\u0006fCN$8\t\\;ti\u0016\u0014\bb\u0002C/g\u0001\u0007!1N\u0001\fo\u0016\u001cHo\u00117vgR,'\u000fC\u0004\u0005bM\u0002\raa@\u0002\u001b\u0015\f7\u000f\u001e'j].\u0004&o\u001c9t\u0011\u001d!)g\ra\u0001\u0007\u007f\fQb^3ti2Kgn\u001b)s_B\u001c\bB\u0002C5g\u0001\u0007\u0011+A\u0005fCN$Hk\u001c9jG\"1AQN\u001aA\u0002E\u000b\u0011b^3tiR{\u0007/[2\u0002/\r\u0014X-\u0019;f\u0005&$\u0017N]3di&|g.\u00197MS:\\G\u0003\u0004C)\tg\"9\b\"\u001f\u0005|\u0011u\u0004B\u0002C;i\u0001\u0007\u0011+\u0001\u0005mS:\\g*Y7f\u0011\u001d!I\u0006\u000ea\u0001\u0005WBq\u0001\"\u00185\u0001\u0004\u0011Y\u0007C\u0004\u0005bQ\u0002\raa@\t\u000f\u0011\u0015D\u00071\u0001\u0004��\u0006aR.\u0019;dQ\u0016\u001cHK];oG\u0006$X\rZ'fgN\fw-Z\"pk:$H\u0003\u0002CB\t7\u0003b\u0001\u0013CC\t\u0013s\u0016b\u0001CD\u0013\nIa)\u001e8di&|g.\r\t\b\u0007\u000b\u001cY-\u0015CF!\u0011!i\tb&\u000e\u0005\u0011=%\u0002\u0002CI\t'\u000bQ!\u00193nS:TA\u0001\"&\u0005@\u000591\r\\5f]R\u001c\u0018\u0002\u0002CM\t\u001f\u0013\u0001\u0004\u0016:v]\u000e\fG/Z!oIJ+7\u000f^8sKJ+7/\u001e7u\u0011\u001d!i*\u000ea\u0001\t?\u000bQ\"\u001a=qK\u000e$X\r\u001a,bYV,\u0007c\u0001%\u0005\"&\u0019A1U%\u0003\t1{gnZ\u0001$[\u0006$8\r[3t!\u0006\u0014H/\u001b;j_:dUM^3m)J,hnY1uS>tG)\u0019;b)\u0011!\u0019\t\"+\t\u000f\u0011-f\u00071\u0001\u0005.\u0006aQ\r\u001f9fGR,G\rR1uCB11Q\u0013CX\t?KA\u0001\"-\u0004$\n!A*[:uQ\u0019\u0001AQ\u0017>\u0005BB!Aq\u0017C_\u001b\t!ILC\u0002\u0005<\"\f1!\u00199j\u0013\u0011!y\f\"/\u0003\u0007Q\u000bw-\t\u0002\u0005D\u0006Y\u0011N\u001c;fOJ\fG/[8oQ\u0019\u0001AQ\u0017>\u0005H\u0006\u0012A\u0011Z\u0001\u0015E\u0006TX\r\u001c\u001etQ\u0006\u0014HmX2pk:$((\r\u0019)\r\u0001!)L\u001fCgC\t!y-\u0001\tcCj,GNO:ju\u0016TD.\u0019:hK\u0002")
/* loaded from: input_file:kafka/link/BidirectionalLinkIntegrationTest.class */
public class BidirectionalLinkIntegrationTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public void maybeUseBidirectionalLink() {
        useBidirectionalLink_$eq(true);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithOutboundConnections(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, (Map) Map$.MODULE$.empty());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithOneConnectionInitiator(String str, boolean z, String str2) {
        useSourceInitiatedLink_$eq(true);
        verifyBidirectionalLink(ConnectionMode$Inbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, (Map) Map$.MODULE$.empty());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithAutoMirroring(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, None$.MODULE$, (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "500")})));
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testBidirectionalLinkWithoutIncludingRemoteMirrors(String str, boolean z, String str2) {
        verifyBidirectionalLink(ConnectionMode$Outbound$.MODULE$, ConnectionMode$Outbound$.MODULE$, new Some(new $colon.colon(TopicType$.MODULE$.LOCAL_MIRROR(), Nil$.MODULE$)), (Map) Map$.MODULE$.empty());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMirrorTopicCreationWithStoppedSequenceNumber(String str, boolean z) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str2 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testMirrorTopicCreationWithStoppedSequenceNumber$1(str2, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.unlinkTopic("topic", linkName(), destCluster.unlinkTopic$default$3(), destCluster.unlinkTopic$default$4(), destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        sourceCluster.deleteTopic("topic", true);
        sourceCluster.linkTopic("topic", replicationFactor(), linkName(), sourceCluster.linkTopic$default$4(), sourceCluster.linkTopic$default$5());
        waitForMirroring(sourceCluster, map);
        Assertions.assertEquals(1L, sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2()).stoppedSequenceNumber());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirror$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithLeaderEpochChanges(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        IntRef create = IntRef.create(3);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), create.elem).foreach$mVc$sp(i -> {
            map.foreach(topicPartition -> {
                return BoxesRunTime.boxToInteger(destCluster.changeLeader(topicPartition));
            });
        });
        map.foreach(topicPartition -> {
            $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(destCluster, create, topicPartition);
            return BoxedUnit.UNIT;
        });
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        map.foreach(topicPartition2 -> {
            $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(sourceCluster, create, topicPartition2);
            return BoxedUnit.UNIT;
        });
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), create.elem).foreach$mVc$sp(i2 -> {
            map.foreach(topicPartition3 -> {
                return BoxesRunTime.boxToInteger(sourceCluster.changeLeader(topicPartition3));
            });
        });
        create.elem += create.elem;
        map.foreach(topicPartition22 -> {
            $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(sourceCluster, create, topicPartition22);
            return BoxedUnit.UNIT;
        });
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        map.foreach(topicPartition222 -> {
            $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(destCluster, create, topicPartition222);
            return BoxedUnit.UNIT;
        });
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithLag(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithLag$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        produceRecordsToCluster(sourceCluster, "topic", 10000);
        destCluster.pauseTopic("topic", false);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 10000);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithRemoteClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.killAllBrokers();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster.updateBootstrapServers();
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", true);
        Assertions.assertNotNull(describeMirrorTopic);
        Assertions.assertEquals(0, describeMirrorTopic.mirrorStateTransitionErrors().size());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithLocalClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster.updateBootstrapServers();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster.bootstrapServers(destCluster.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLinks(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilPendingMirrorState("topic", linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SYNCHRONIZE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.pauseTopic("topic", destCluster.pauseTopic$default$2());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.killAllBrokers();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster.updateBootstrapServers();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster.bootstrapServers(destCluster.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithDeletedAndRecreatedTopics(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithDeletedAndRecreatedTopics$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.deleteTopic(topic(), true);
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.deleteTopic(topic(), true);
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.deleteTopic(topic(), true);
        destCluster.createTopic(topic(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndStartMirrorWithDeletedTopics(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndStartMirrorWithDeletedTopics$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.deleteTopic(topic(), true);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testRollbackMirrorFailsOnPendingMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testRollbackMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            sourceCluster.alterMirrors(this.topic(), AlterMirrorOp.ROLLBACK, sourceCluster.alterMirrors$default$3());
        });
        sourceCluster.unlinkTopic(topic(), linkName(), sourceCluster.unlinkTopic$default$3(), false, sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testPauseMirrorFailsOnPendingMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testPauseMirrorFailsOnPendingMirror$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        destCluster.killAllBrokers();
        sourceCluster.waitUntilPendingMirrorState(topic(), linkName(), sourceCluster.waitUntilPendingMirrorState$default$3());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(InternalTaskErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), 120000L);
        if (z) {
            verifyCoordinatorChangeHandlesStoppingAndStartingTasks(sourceCluster, createBidirectionalLink, topic(), ClusterLinkConvertToMirrorTopicTaskType$.MODULE$, true, true);
        }
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testPauseMirrorFailsOnPendingMirror$2(this, sourceCluster);
                Assertions.assertThrows(InvalidRequestException.class, () -> {
                    sourceCluster.alterMirrors(this.topic(), AlterMirrorOp.PAUSE, sourceCluster.alterMirrors$default$3());
                });
                sourceCluster.unlinkTopic(topic(), linkName(), sourceCluster.unlinkTopic$default$3(), false, sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
                sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseAndPauseMirror(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseAndPauseMirror$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(sourceCluster.linkCoordinator(linkName()), Nil$.MODULE$), sourceCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        sourceCluster.pauseTopic("topic", false);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_PAUSE_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_synchronization", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        assertClusterLinkMirrorTransitionMetricMaxVal(ClusterLinkMetrics$.MODULE$.metricsGroup(), "pending_mirror", new $colon.colon(NoErrorCode$.MODULE$, Nil$.MODULE$), 1.0d, new $colon.colon(destCluster.linkCoordinator(linkName()), Nil$.MODULE$), destCluster.nonLinkCoordinators(linkName()), assertClusterLinkMirrorTransitionMetricMaxVal$default$7());
        destCluster.pauseTopic("topic", false);
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreRemoteClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreRemoteClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.killAllBrokers();
        sourceCluster.restartDeadBrokers(sourceCluster.restartDeadBrokers$default$1());
        sourceCluster.updateBootstrapServers();
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", true);
        Assertions.assertNotNull(describeMirrorTopic);
        Assertions.assertEquals(0, describeMirrorTopic.mirrorStateTransitionErrors().size());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreLocalClusterRestart(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreLocalClusterRestart$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), 2);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SETUP_FOR_RESTORE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.killAllBrokers();
        destCluster.restartDeadBrokers(destCluster.restartDeadBrokers$default$1());
        destCluster.updateBootstrapServers();
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster.bootstrapServers(destCluster.bootstrapServers$default$1()))})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", true);
        Assertions.assertNotNull(describeMirrorTopic);
        Assertions.assertEquals(0, describeMirrorTopic.mirrorStateTransitionErrors().size());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithDeletedAndRecreatedTopics(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithDeletedAndRecreatedTopics$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        sourceCluster.alterMirrorsAssertCondition(topic(), AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition(topic(), AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        destCluster.deleteTopic(topic(), true);
        destCluster.createTopic(topic(), numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreMirroringWithPauseAndUnpauseLinks(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreMirroringWithPauseAndUnpauseLinks$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        configureMirrorTransitionBatchSize(new $colon.colon(destCluster, new $colon.colon(sourceCluster, Nil$.MODULE$)), Integer.MAX_VALUE);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_SETUP_FOR_RESTORE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster.alterClusterLink$default$3(), destCluster.alterClusterLink$default$4(), destCluster.alterClusterLink$default$5());
        sourceCluster.alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), sourceCluster.alterClusterLink$default$3(), sourceCluster.alterClusterLink$default$4(), sourceCluster.alterClusterLink$default$5());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreMirrorWithLeaderEpochChanges(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
        bumpLeaderEpochs$2(destCluster, 3, map);
        verifyLeaderEpochIsBumped$2(destCluster, map, 3);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        bumpLeaderEpochs$2(sourceCluster, 3, map);
        verifyLeaderEpochIsBumped$2(sourceCluster, map, 3);
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        verifyLeaderEpochIsBumped$2(sourceCluster, map, 3);
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    private void produceRecordsToCluster(ClusterLinkTestHarness clusterLinkTestHarness, String str, int i) {
        KafkaProducer<byte[], byte[]> createProducer = clusterLinkTestHarness.createProducer(clusterLinkTestHarness.createProducer$default$1(), clusterLinkTestHarness.createProducer$default$2(), clusterLinkTestHarness.createProducer$default$3());
        produceRecords(createProducer, str, i, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        createProducer.close();
    }

    @MethodSource({"quorumCoordinatorReplicationCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testMultipleMirrorStateTransitions(String str, boolean z, String str2) {
        AlterMirrorsOptions timeoutMs = new AlterMirrorsOptions().timeoutMs(Predef$.MODULE$.int2Integer(20000));
        ObjectRef create = ObjectRef.create(sourceCluster());
        ObjectRef create2 = ObjectRef.create(destCluster());
        Map$ map$ = Map$.MODULE$;
        ScalaRunTime$ scalaRunTime$ = ScalaRunTime$.MODULE$;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc((ClusterLinkTestHarness) create.elem);
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) create.elem;
        Predef$ArrowAssoc$ predef$ArrowAssoc$2 = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc2 = Predef$.MODULE$.ArrowAssoc((ClusterLinkTestHarness) create2.elem);
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) create2.elem;
        Map map = (Map) map$.apply(scalaRunTime$.wrapRefArray(new Tuple2[]{predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, clusterLinkTestHarness.createConfluentAdminClient(clusterLinkTestHarness.createConfluentAdminClient$default$1())), predef$ArrowAssoc$2.$minus$greater$extension(ArrowAssoc2, clusterLinkTestHarness2.createConfluentAdminClient(clusterLinkTestHarness2.createConfluentAdminClient$default$1()))}));
        ((ClusterLinkTestHarness) create.elem).producerConfig().setProperty("enable.idempotence", "false");
        ((ClusterLinkTestHarness) create2.elem).producerConfig().setProperty("enable.idempotence", "false");
        Map<String, String> map2 = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter("*"))}));
        createBidirectionalLink(linkName(), (ClusterLinkTestHarness) create2.elem, (ClusterLinkTestHarness) create.elem, linkProps(ConnectionMode$Outbound$.MODULE$, (ClusterLinkTestHarness) create.elem, linkProps$default$3(), linkProps$default$4(), map2), linkProps(ConnectionMode$Outbound$.MODULE$, (ClusterLinkTestHarness) create2.elem, linkProps$default$3(), linkProps$default$4(), map2));
        IndexedSeq map3 = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 6).map(obj -> {
            return $anonfun$testMultipleMirrorStateTransitions$4(BoxesRunTime.unboxToInt(obj));
        });
        map3.foreach(str3 -> {
            $anonfun$testMultipleMirrorStateTransitions$5(this, create, create2, str3);
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 4).foreach$mVc$sp(i -> {
            this.reverseMirroring$1(map3, map, create2, timeoutMs, create);
        });
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) create.elem;
        Consumer createConsumer = clusterLinkTestHarness3.createConsumer(clusterLinkTestHarness3.createConsumer$default$1(), clusterLinkTestHarness3.createConsumer$default$2(), clusterLinkTestHarness3.createConsumer$default$3(), clusterLinkTestHarness3.createConsumer$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness4 = (ClusterLinkTestHarness) create2.elem;
        Consumer createConsumer2 = clusterLinkTestHarness4.createConsumer(clusterLinkTestHarness4.createConsumer$default$1(), clusterLinkTestHarness4.createConsumer$default$2(), clusterLinkTestHarness4.createConsumer$default$3(), clusterLinkTestHarness4.createConsumer$default$4());
        createConsumerAndCommitOffsets$1(createConsumer, map3);
        createConsumerAndCommitOffsets$1(createConsumer2, map3);
        Tuple2 splitAt = map3.splitAt(3);
        if (splitAt == null) {
            throw new MatchError((Object) null);
        }
        IndexedSeq indexedSeq = (IndexedSeq) splitAt._1();
        IndexedSeq indexedSeq2 = (IndexedSeq) splitAt._2();
        ((ConfluentAdmin) map.apply((ClusterLinkTestHarness) create2.elem)).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) indexedSeq.map(str4 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str4), AlterMirrorOp.PROMOTE);
        })).toMap($less$colon$less$.MODULE$.refl())).asJava(), timeoutMs);
        ((ConfluentAdmin) map.apply((ClusterLinkTestHarness) create2.elem)).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) indexedSeq2.map(str5 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str5), AlterMirrorOp.FAILOVER);
        })).toMap($less$colon$less$.MODULE$.refl())).asJava(), timeoutMs);
        indexedSeq.foreach(str6 -> {
            $anonfun$testMultipleMirrorStateTransitions$11(this, create2, str6);
            return BoxedUnit.UNIT;
        });
        indexedSeq2.foreach(str7 -> {
            $anonfun$testMultipleMirrorStateTransitions$12(this, create2, str7);
            return BoxedUnit.UNIT;
        });
        createConsumer2.close();
        indexedSeq.foreach(str8 -> {
            $anonfun$testMultipleMirrorStateTransitions$13(this, create2, str8);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals(((IterableOnceOps) ((IndexedSeqOps) indexedSeq.flatMap(str9 -> {
            return this.partitions$3(str9);
        })).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(((ClusterLinkTestHarness) create2.elem).leaderLog(topicPartition).logEndOffset()));
        })).toMap($less$colon$less$.MODULE$.refl()).$plus$plus(((IterableOnceOps) ((IndexedSeqOps) indexedSeq2.flatMap(str10 -> {
            return this.partitions$3(str10);
        })).map(topicPartition2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition2), BoxesRunTime.boxToLong(200L));
        })).toMap($less$colon$less$.MODULE$.refl())), CollectionConverters$.MODULE$.MapHasAsScala((java.util.Map) ((java.util.Map) ((Admin) map.apply((ClusterLinkTestHarness) create2.elem)).listConsumerGroupOffsets("group").all().get(15L, TimeUnit.SECONDS)).get("group")).asScala().map(tuple2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple2._1()), BoxesRunTime.boxToLong(((OffsetAndMetadata) tuple2._2()).offset()));
        }));
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithDeletedTopics(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithDeletedTopics$1(this, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic(topic(), replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors(topic(), AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, topic(), linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, topic(), 20);
        sourceCluster.alterMirrorsAssertCondition(topic(), AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition(topic(), AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        destCluster.deleteTopic(topic(), true);
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, topic(), linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestore(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestore$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topic"})), sourceCluster.listRemoteTopics(linkName(), sourceCluster.listRemoteTopics$default$2()));
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topic"})), sourceCluster.listRemoteTopics(linkName(), true));
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 0L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(0L), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        MirrorTopicDescription describeMirrorTopic = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(createBidirectionalLink, describeMirrorTopic.clusterLinkId());
        Assertions.assertEquals(destCluster.describeTopic("topic").topicId(), describeMirrorTopic.sourceTopicId());
        Assertions.assertEquals("topic", describeMirrorTopic.sourceTopic());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithNewLeaderEpochsOnRemoteCluster(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithNewLeaderEpochsOnRemoteCluster$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        map.foreach(topicPartition -> {
            return BoxesRunTime.boxToInteger(sourceCluster.changeLeader(topicPartition));
        });
        produceRecordsToCluster(sourceCluster, "topic", 20 / 2);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapLongArray(new long[]{3, 3, 2, 2}))), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20 / 2), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithNewLeaderEpochsOnLocalCluster(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithNewLeaderEpochsOnLocalCluster$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        map.foreach(topicPartition -> {
            return BoxesRunTime.boxToInteger(destCluster.changeLeader(topicPartition));
        });
        produceRecordsToCluster(destCluster, "topic", 20 / 2);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 0L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(0L), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithDivergentRecordsAfterFailover(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithDivergentRecordsAfterFailover$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), true, sourceCluster.alterMirrorsAssertCondition$default$6());
        AlterMirrorOp alterMirrorOp = AlterMirrorOp.TRUNCATE_AND_RESTORE;
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), true);
        waitUntilMirrorStartOffsetsAreCleared("topic", waitUntilMirrorStartOffsetsAreCleared$default$2());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithProduceAfterFailover(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithProduceAfterFailover$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 0L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(0L), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        waitUntilMirrorStartOffsetsAreCleared("topic", waitUntilMirrorStartOffsetsAreCleared$default$2());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        waitForMirroring(sourceCluster, map);
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreWithDivergentRecordsAndProduceAfterFailover(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreWithDivergentRecordsAndProduceAfterFailover$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        produceRecordsToCluster(destCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        waitUntilMirrorStartOffsetsAreCleared("topic", waitUntilMirrorStartOffsetsAreCleared$default$2());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        waitForMirroring(sourceCluster, map);
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testFailoverThenRestoreThenReverse(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testFailoverThenRestoreThenReverse$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        produceRecordsToCluster(destCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", destCluster.describeMirrorTopic$default$2());
        MirrorTopicDescription describeMirrorTopic2 = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(2L, describeMirrorTopic.stoppedSequenceNumber());
        Assertions.assertEquals(2L, describeMirrorTopic2.stoppedSequenceNumber());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseThenFailoverThenRestore(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseThenFailoverThenRestore$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        produceRecordsToCluster(sourceCluster, "topic", 20);
        destCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        destCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), destCluster.alterMirrorsAssertCondition$default$5(), destCluster.alterMirrorsAssertCondition$default$6());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", destCluster.describeMirrorTopic$default$2());
        MirrorTopicDescription describeMirrorTopic2 = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(2L, describeMirrorTopic.stoppedSequenceNumber());
        Assertions.assertEquals(2L, describeMirrorTopic2.stoppedSequenceNumber());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testFailoverThenRestoreThenReverseTwice(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testFailoverThenRestoreThenReverseTwice$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        produceRecordsToCluster(destCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", destCluster.describeMirrorTopic$default$2());
        MirrorTopicDescription describeMirrorTopic2 = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(3L, describeMirrorTopic.stoppedSequenceNumber());
        Assertions.assertEquals(3L, describeMirrorTopic2.stoppedSequenceNumber());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testReverseTwiceThenFailoverThenRestore(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testReverseTwiceThenFailoverThenRestore$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        produceRecordsToCluster(sourceCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", destCluster.describeMirrorTopic$default$2());
        MirrorTopicDescription describeMirrorTopic2 = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(3L, describeMirrorTopic.stoppedSequenceNumber());
        Assertions.assertEquals(3L, describeMirrorTopic2.stoppedSequenceNumber());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreFailsOnWrongStoppedMirrorTopic(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreFailsOnWrongStoppedMirrorTopic$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(sourceCluster, "topic", 20);
        produceRecordsToCluster(destCluster, "topic", 20);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 5L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(20), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
        sourceCluster.alterMirrors("topic", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            destCluster.alterMirrors(str3, AlterMirrorOp.TRUNCATE_AND_RESTORE, this.linkName());
        });
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 0L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(0L), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        MirrorTopicDescription describeMirrorTopic = destCluster.describeMirrorTopic("topic", destCluster.describeMirrorTopic$default$2());
        MirrorTopicDescription describeMirrorTopic2 = sourceCluster.describeMirrorTopic("topic", sourceCluster.describeMirrorTopic$default$2());
        Assertions.assertEquals(3L, describeMirrorTopic.stoppedSequenceNumber());
        Assertions.assertEquals(3L, describeMirrorTopic2.stoppedSequenceNumber());
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreCombinationOfMirrorStates(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic1";
        String str4 = "topic2";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreCombinationOfMirrorStates$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        IndexedSeq map2 = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2 -> {
            return $anonfun$testTruncateAndRestoreCombinationOfMirrorStates$2(str4, BoxesRunTime.unboxToInt(obj2));
        });
        sourceCluster.createTopic("topic1", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic1", 20);
        sourceCluster.createTopic("topic2", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        produceRecordsToCluster(sourceCluster, "topic2", 20);
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic1", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        destCluster.linkTopic("topic2", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        waitForMirroring(destCluster, map2);
        destCluster.alterMirrors("topic1", AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic1", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic1", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.alterMirrors("topic1", AlterMirrorOp.FAILOVER, sourceCluster.alterMirrors$default$3());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic1", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic2", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        AlterMirrorsResult alterMirrorsRequest = destCluster.alterMirrorsRequest(CollectionConverters$.MODULE$.MapHasAsJava((Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), AlterMirrorOp.TRUNCATE_AND_RESTORE), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), AlterMirrorOp.TRUNCATE_AND_RESTORE)}))).asJava(), linkName());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic1", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        alterMirrorsRequest.truncateAndRestoreValues().forEach((str5, kafkaFuture) -> {
            if (str5 != null ? !str5.equals(str3) : str3 != null) {
                Assertions.assertEquals(InvalidRequestException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    kafkaFuture.get();
                })).getCause().getClass());
            } else {
                Assertions.assertEquals(0L, ((TruncateAndRestoreResult) kafkaFuture.get()).messagesTruncated());
            }
        });
    }

    @MethodSource({"kraftCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}.localReplication={2}")
    public void testTruncateAndRestoreNoRecordsInTopic(String str, boolean z, String str2) {
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str3 = "topic";
        IndexedSeq map = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testTruncateAndRestoreNoRecordsInTopic$1(str3, BoxesRunTime.unboxToInt(obj));
        });
        sourceCluster.createTopic("topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps(ConnectionMode$Inbound$.MODULE$, sourceCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()), linkProps(ConnectionMode$Outbound$.MODULE$, destCluster, linkProps$default$3(), linkProps$default$4(), linkProps$default$5()));
        destCluster.linkTopic("topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
        waitForMirroring(destCluster, map);
        destCluster.alterMirrors("topic", AlterMirrorOp.FAILOVER, destCluster.alterMirrors$default$3());
        destCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, "topic", linkName(), destCluster.waitUntilMirrorDescriptionState$default$4());
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesPartitionLevelTruncationData((List) scala.package$.MODULE$.List().fill(numPartitions(), () -> {
            return 0L;
        })), linkName(), true, true);
        sourceCluster.alterMirrorsAssertCondition("topic", AlterMirrorOp.TRUNCATE_AND_RESTORE, matchesTruncatedMessageCount(0L), linkName(), sourceCluster.alterMirrorsAssertCondition$default$5(), sourceCluster.alterMirrorsAssertCondition$default$6());
        sourceCluster.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, "topic", linkName(), sourceCluster.waitUntilMirrorDescriptionState$default$4());
        produceRecordsToCluster(destCluster, "topic", 20);
        waitForMirroring(sourceCluster, map);
    }

    private void verifyBidirectionalLink(ConnectionMode connectionMode, ConnectionMode connectionMode2, Option<Seq<Enumeration.Value>> option, Map<String, String> map) {
        Assumptions.assumeTrue(clusterLinkPrefix().isEmpty());
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        String str = "east.topic";
        String str2 = "west.topic";
        KafkaProducer<byte[], byte[]> createProducer = destCluster.createProducer(destCluster.createProducer$default$1(), destCluster.createProducer$default$2(), destCluster.createProducer$default$3());
        KafkaProducer<byte[], byte[]> createProducer2 = sourceCluster.createProducer(sourceCluster.createProducer$default$1(), sourceCluster.createProducer$default$2(), sourceCluster.createProducer$default$3());
        sourceCluster.createTopic("west.topic", numPartitions(), replicationFactor(), sourceCluster.createTopic$default$4(), sourceCluster.createTopic$default$5(), sourceCluster.createTopic$default$6());
        destCluster.createTopic("east.topic", numPartitions(), replicationFactor(), destCluster.createTopic$default$4(), destCluster.createTopic$default$5(), destCluster.createTopic$default$6());
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        Properties linkProps = linkProps(connectionMode, sourceCluster, new Some("west.group"), option, map);
        Properties linkProps2 = linkProps(connectionMode2, destCluster, new Some("east.group"), option, map);
        Uuid createBidirectionalLink = createBidirectionalLink(linkName(), destCluster, sourceCluster, linkProps, linkProps2);
        if (!map.get(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).contains("true")) {
            destCluster.linkTopic("west.topic", replicationFactor(), linkName(), destCluster.linkTopic$default$4(), destCluster.linkTopic$default$5());
            sourceCluster.linkTopic("east.topic", replicationFactor(), linkName(), sourceCluster.linkTopic$default$4(), sourceCluster.linkTopic$default$5());
        }
        IndexedSeq map2 = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$verifyBidirectionalLink$1(str2, BoxesRunTime.unboxToInt(obj));
        });
        IndexedSeq map3 = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj2 -> {
            return $anonfun$verifyBidirectionalLink$2(str, BoxesRunTime.unboxToInt(obj2));
        });
        waitForMirroring(destCluster, map2);
        waitForMirroring(sourceCluster, map3);
        produceRecords(createProducer2, "west.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        produceRecords(createProducer, "east.topic", 20, produceRecords$default$4(), produceRecords$default$5(), produceRecords$default$6(), produceRecords$default$7());
        waitForMirroring(destCluster, map2);
        waitForMirroring(sourceCluster, map3);
        long nextOffset = nextOffset(new TopicPartition("east.topic", 0));
        commitOffsets(destCluster, "east.topic", 0, nextOffset, "east.group");
        commitOffsets(destCluster, "west.topic", 0, nextOffset, "east.group");
        commitOffsets(sourceCluster, "west.topic", 0, nextOffset, "west.group");
        commitOffsets(sourceCluster, "east.topic", 0, nextOffset, "west.group");
        verifyOffsetMigration("west.topic", 0, nextOffset, "west.group", destCluster);
        boolean z = option.isDefined() && !((SeqOps) option.get()).contains(TopicType$.MODULE$.REMOTE_MIRROR());
        if (z) {
            verifyOffsetMigration("east.topic", 0, 0L, "west.group", destCluster);
        } else {
            verifyOffsetMigration("east.topic", 0, nextOffset, "west.group", destCluster);
        }
        verifyOffsetMigration("east.topic", 0, nextOffset, "east.group", sourceCluster);
        if (z) {
            verifyOffsetMigration("west.topic", 0, 0L, "east.group", sourceCluster);
        } else {
            verifyOffsetMigration("west.topic", 0, nextOffset, "east.group", sourceCluster);
        }
        verifyLinkMetrics(createBidirectionalLink, destCluster, sourceCluster, linkProps, linkProps2, "east.topic", "west.topic");
        destCluster.unlinkTopic("west.topic", linkName(), destCluster.unlinkTopic$default$3(), destCluster.unlinkTopic$default$4(), destCluster.unlinkTopic$default$5(), destCluster.unlinkTopic$default$6());
        sourceCluster.unlinkTopic("east.topic", linkName(), sourceCluster.unlinkTopic$default$3(), sourceCluster.unlinkTopic$default$4(), sourceCluster.unlinkTopic$default$5(), sourceCluster.unlinkTopic$default$6());
        verifyMirror(destCluster, map2);
        verifyMirror(sourceCluster, map3);
        destCluster.deleteClusterLink(linkName(), destCluster.deleteClusterLink$default$2(), destCluster.deleteClusterLink$default$3());
        sourceCluster.deleteClusterLink(linkName(), sourceCluster.deleteClusterLink$default$2(), sourceCluster.deleteClusterLink$default$3());
    }

    private Option<Seq<Enumeration.Value>> verifyBidirectionalLink$default$3() {
        return None$.MODULE$;
    }

    private Map<String, String> verifyBidirectionalLink$default$4() {
        return (Map) Map$.MODULE$.empty();
    }

    private void configureMirrorTransitionBatchSize(Seq<ClusterLinkTestHarness> seq, int i) {
        seq.foreach(clusterLinkTestHarness -> {
            $anonfun$configureMirrorTransitionBatchSize$1(i, clusterLinkTestHarness);
            return BoxedUnit.UNIT;
        });
    }

    public Properties linkProps(ConnectionMode connectionMode, ClusterLinkTestHarness clusterLinkTestHarness, Option<String> option, Option<Seq<Enumeration.Value>> option2, Map<String, String> map) {
        Properties properties = new Properties();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Outbound$)) {
            String createLinkCredentials = createLinkCredentials(linkName(), clusterLinkTestHarness, createLinkCredentials$default$3());
            properties.put("bootstrap.servers", clusterLinkTestHarness.bootstrapServers(clusterLinkTestHarness.bootstrapServers$default$1()));
            Implicits$ implicits$ = Implicits$.MODULE$;
            new Implicits.PropertiesOps(properties).$plus$plus$eq(clusterLinkTestHarness.clientSecurityProps(linkName()));
            properties.put("sasl.jaas.config", createLinkCredentials);
        }
        properties.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "10000");
        properties.put("metadata.max.age.ms", "2000");
        properties.put("reconnect.backoff.max.ms", "1000");
        properties.setProperty("request.timeout.ms", "1000");
        properties.setProperty("default.api.timeout.ms", "1000");
        properties.setProperty(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        properties.setProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        option.foreach(str -> {
            String consumerGroupFilter;
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
            if (option2 instanceof Some) {
                consumerGroupFilter = this.consumerGroupFilter(str, (Seq) ((Some) option2).value());
            } else {
                if (!None$.MODULE$.equals(option2)) {
                    throw new MatchError(option2);
                }
                consumerGroupFilter = this.consumerGroupFilter(str);
            }
            properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter);
            return properties.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        });
        Implicits$ implicits$2 = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties).$plus$plus$eq(map);
        return properties;
    }

    public Option<String> linkProps$default$3() {
        return None$.MODULE$;
    }

    public Option<Seq<Enumeration.Value>> linkProps$default$4() {
        return None$.MODULE$;
    }

    public Map<String, String> linkProps$default$5() {
        return (Map) Map$.MODULE$.empty();
    }

    private void verifyMirror(ClusterLinkTestHarness clusterLinkTestHarness, Seq<TopicPartition> seq) {
        Consumer<byte[], byte[]> createConsumer = clusterLinkTestHarness.createConsumer(clusterLinkTestHarness.createConsumer$default$1(), clusterLinkTestHarness.createConsumer$default$2(), clusterLinkTestHarness.createConsumer$default$3(), clusterLinkTestHarness.createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(seq).asJava());
        consumePartitionRecords(createConsumer, seq.toSet(), clusterLinkPrefix(), ((TopicPartition) seq.head()).topic(), consumePartitionRecords$default$5());
        createConsumer.close();
    }

    private void verifyLinkMetrics(Uuid uuid, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2, String str, String str2) {
        new $colon.colon(clusterLinkTestHarness, new $colon.colon(clusterLinkTestHarness2, Nil$.MODULE$)).foreach(clusterLinkTestHarness3 -> {
            $anonfun$verifyLinkMetrics$1(this, clusterLinkTestHarness3);
            return BoxedUnit.UNIT;
        });
        destCluster_$eq(clusterLinkTestHarness);
        sourceCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str2);
        sourceCluster_$eq(clusterLinkTestHarness);
        destCluster_$eq(clusterLinkTestHarness2);
        verifyDestinationLinkMetrics(uuid, properties2, true, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, str);
        ConnectionMode fromString = ConnectionMode$.MODULE$.fromString(properties.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode fromString2 = ConnectionMode$.MODULE$.fromString(properties2.getProperty(ClusterLinkConfig$.MODULE$.ConnectionModeProp()));
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (fromString == null || !fromString.equals(connectionMode$Inbound$)) {
            ConnectionMode$Inbound$ connectionMode$Inbound$2 = ConnectionMode$Inbound$.MODULE$;
            if (fromString2 == null || !fromString2.equals(connectionMode$Inbound$2)) {
                return;
            }
        }
        verifyReverseConnectionMetrics(linkName(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL, ClusterLinkConfig.LinkMode.BIDIRECTIONAL, (fromString != null && fromString.equals(ConnectionMode$Outbound$.MODULE$)) ? clusterLinkTestHarness : clusterLinkTestHarness2, (fromString != null && fromString.equals(ConnectionMode$Inbound$.MODULE$)) ? clusterLinkTestHarness : clusterLinkTestHarness2);
    }

    public Uuid createBidirectionalLink(String str, ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, Properties properties, Properties properties2) {
        Uuid createClusterLink = clusterLinkTestHarness.createClusterLink(str, properties, new Some(((KafkaBroker) clusterLinkTestHarness2.brokers().head()).clusterId()), true);
        Assertions.assertEquals(createClusterLink, clusterLinkTestHarness2.createClusterLinkWithAllOptions(str, properties2, new Some(((KafkaBroker) clusterLinkTestHarness.brokers().head()).clusterId()), true, new Some(createClusterLink), clusterLinkTestHarness2.createClusterLinkWithAllOptions$default$6()));
        return createClusterLink;
    }

    private Function1<Map<String, TruncateAndRestoreResult>, Object> matchesTruncatedMessageCount(long j) {
        return map -> {
            return BoxesRunTime.boxToBoolean($anonfun$matchesTruncatedMessageCount$1(j, map));
        };
    }

    private Function1<Map<String, TruncateAndRestoreResult>, Object> matchesPartitionLevelTruncationData(List<Object> list) {
        return map -> {
            return BoxesRunTime.boxToBoolean($anonfun$matchesPartitionLevelTruncationData$1(list, map));
        };
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMirrorTopicCreationWithStoppedSequenceNumber$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirror$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ boolean $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$5(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition, IntRef intRef) {
        return clusterLinkTestHarness.replicaStatusWithPartitionResult(topicPartition.topic(), topicPartition.partition()).leaderEpoch().orElseGet(() -> {
            return 0;
        }) >= intRef.elem;
    }

    public static final /* synthetic */ String $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$7() {
        return "Leader epoch did not get bumped";
    }

    public static final /* synthetic */ void $anonfun$testReverseAndStartMirrorWithLeaderEpochChanges$4(ClusterLinkTestHarness clusterLinkTestHarness, IntRef intRef, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if (clusterLinkTestHarness.replicaStatusWithPartitionResult(topicPartition.topic(), topicPartition.partition()).leaderEpoch().orElseGet(() -> {
                return 0;
            }) >= intRef.elem) {
                return;
            }
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Leader epoch did not get bumped");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithLag$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithRemoteClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithLocalClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLinks$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopic$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirroringWithPauseAndUnpauseLocalMirrorTopicWithLocalClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithDeletedAndRecreatedTopics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndStartMirrorWithDeletedTopics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testRollbackMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testPauseMirrorFailsOnPendingMirror$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ void $anonfun$testPauseMirrorFailsOnPendingMirror$2(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness) {
        MirrorTopicDescription describeMirrorTopic = clusterLinkTestHarness.describeMirrorTopic(bidirectionalLinkIntegrationTest.topic(), true);
        Assertions.assertNotNull(describeMirrorTopic);
        java.util.List mirrorStateTransitionErrors = describeMirrorTopic.mirrorStateTransitionErrors();
        Assertions.assertEquals(1, mirrorStateTransitionErrors.size());
        Assertions.assertEquals(ClusterLinkTaskError.ClusterLinkTaskErrorCode.INTERNAL_ERROR, ((ClusterLinkTaskError) mirrorStateTransitionErrors.get(0)).errorCode());
        MirrorTopicDescription describeMirrorTopic2 = clusterLinkTestHarness.describeMirrorTopic(bidirectionalLinkIntegrationTest.topic(), clusterLinkTestHarness.describeMirrorTopic$default$2());
        Assertions.assertNotNull(describeMirrorTopic2);
        describeMirrorTopic2.mirrorStateTransitionErrors();
        Assertions.assertEquals(0, describeMirrorTopic2.mirrorStateTransitionErrors().size());
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseAndPauseMirror$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreRemoteClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreLocalClusterRestart$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithDeletedAndRecreatedTopics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreMirroringWithPauseAndUnpauseLinks$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    private static final void bumpLeaderEpochs$2(ClusterLinkTestHarness clusterLinkTestHarness, int i, IndexedSeq indexedSeq) {
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(i2 -> {
            indexedSeq.foreach(topicPartition -> {
                return BoxesRunTime.boxToInteger(clusterLinkTestHarness.changeLeader(topicPartition));
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$5(ClusterLinkTestHarness clusterLinkTestHarness, TopicPartition topicPartition, int i) {
        return clusterLinkTestHarness.replicaStatusWithPartitionResult(topicPartition.topic(), topicPartition.partition()).leaderEpoch().orElseGet(() -> {
            return 0;
        }) >= i;
    }

    public static final /* synthetic */ String $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$7() {
        return "Leader epoch did not get bumped";
    }

    public static final /* synthetic */ void $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$4(ClusterLinkTestHarness clusterLinkTestHarness, int i, TopicPartition topicPartition) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$5(clusterLinkTestHarness, topicPartition, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Leader epoch did not get bumped");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    private static final void verifyLeaderEpochIsBumped$2(ClusterLinkTestHarness clusterLinkTestHarness, IndexedSeq indexedSeq, int i) {
        indexedSeq.foreach(topicPartition -> {
            $anonfun$testTruncateAndRestoreMirrorWithLeaderEpochChanges$4(clusterLinkTestHarness, i, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMultipleMirrorStateTransitions$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Seq partitions$3(String str) {
        return RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testMultipleMirrorStateTransitions$1(str, BoxesRunTime.unboxToInt(obj));
        });
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$3(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, ObjectRef objectRef2, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        MirrorTopicDescription.State state = MirrorTopicDescription.State.STOPPED;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) objectRef2.elem;
        MirrorTopicDescription.State state2 = MirrorTopicDescription.State.ACTIVE;
        clusterLinkTestHarness2.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness2.waitUntilMirrorDescriptionState$default$4());
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) objectRef.elem;
        bidirectionalLinkIntegrationTest.produceRecords(clusterLinkTestHarness3.createProducer(clusterLinkTestHarness3.createProducer$default$1(), clusterLinkTestHarness3.createProducer$default$2(), clusterLinkTestHarness3.createProducer$default$3()), str, 20, bidirectionalLinkIntegrationTest.produceRecords$default$4(), bidirectionalLinkIntegrationTest.produceRecords$default$5(), bidirectionalLinkIntegrationTest.produceRecords$default$6(), bidirectionalLinkIntegrationTest.produceRecords$default$7());
        bidirectionalLinkIntegrationTest.waitForMirroring((ClusterLinkTestHarness) objectRef2.elem, bidirectionalLinkIntegrationTest.partitions$3(str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void reverseMirroring$1(Seq seq, Map map, ObjectRef objectRef, AlterMirrorsOptions alterMirrorsOptions, ObjectRef objectRef2) {
        ((ConfluentAdmin) map.apply((ClusterLinkTestHarness) objectRef.elem)).alterMirrors(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) seq.map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), AlterMirrorOp.REVERSE_AND_START_REMOTE_MIRROR);
        })).toMap($less$colon$less$.MODULE$.refl())).asJava(), alterMirrorsOptions);
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef2.elem;
        objectRef2.elem = (ClusterLinkTestHarness) objectRef.elem;
        objectRef.elem = clusterLinkTestHarness;
        seq.foreach(str2 -> {
            $anonfun$testMultipleMirrorStateTransitions$3(this, objectRef2, objectRef, str2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$testMultipleMirrorStateTransitions$4(int i) {
        return new StringBuilder(5).append("topic").append(i).toString();
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$5(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, ObjectRef objectRef2, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        clusterLinkTestHarness.createTopic(str, bidirectionalLinkIntegrationTest.numPartitions(), bidirectionalLinkIntegrationTest.replicationFactor(), clusterLinkTestHarness.createTopic$default$4(), clusterLinkTestHarness.createTopic$default$5(), clusterLinkTestHarness.createTopic$default$6());
        ClusterLinkTestHarness clusterLinkTestHarness2 = (ClusterLinkTestHarness) objectRef.elem;
        bidirectionalLinkIntegrationTest.produceRecords(clusterLinkTestHarness2.createProducer(clusterLinkTestHarness2.createProducer$default$1(), clusterLinkTestHarness2.createProducer$default$2(), clusterLinkTestHarness2.createProducer$default$3()), str, 20, bidirectionalLinkIntegrationTest.produceRecords$default$4(), bidirectionalLinkIntegrationTest.produceRecords$default$5(), bidirectionalLinkIntegrationTest.produceRecords$default$6(), bidirectionalLinkIntegrationTest.produceRecords$default$7());
        ClusterLinkTestHarness clusterLinkTestHarness3 = (ClusterLinkTestHarness) objectRef2.elem;
        clusterLinkTestHarness3.linkTopic(str, bidirectionalLinkIntegrationTest.replicationFactor(), bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness3.linkTopic$default$4(), clusterLinkTestHarness3.linkTopic$default$5());
        bidirectionalLinkIntegrationTest.waitForMirroring((ClusterLinkTestHarness) objectRef2.elem, bidirectionalLinkIntegrationTest.partitions$3(str));
    }

    private final void createConsumerAndCommitOffsets$1(Consumer consumer, IndexedSeq indexedSeq) {
        consumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(indexedSeq).asJava());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        testUtils$.pollUntilAtLeastNumRecords(consumer, 100, 15000L);
        consumer.commitSync(CollectionConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) ((IndexedSeqOps) indexedSeq.flatMap(str -> {
            return this.partitions$3(str);
        })).map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicPartition), new OffsetAndMetadata(200L));
        })).toMap($less$colon$less$.MODULE$.refl())).asJava());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$11(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        MirrorTopicDescription.State state = MirrorTopicDescription.State.PENDING_STOPPED;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PENDING_STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$12(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        MirrorTopicDescription.State state = MirrorTopicDescription.State.STOPPED;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ void $anonfun$testMultipleMirrorStateTransitions$13(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ObjectRef objectRef, String str) {
        ClusterLinkTestHarness clusterLinkTestHarness = (ClusterLinkTestHarness) objectRef.elem;
        MirrorTopicDescription.State state = MirrorTopicDescription.State.STOPPED;
        clusterLinkTestHarness.waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED, str, bidirectionalLinkIntegrationTest.linkName(), clusterLinkTestHarness.waitUntilMirrorDescriptionState$default$4());
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithDeletedTopics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, int i) {
        return new TopicPartition(bidirectionalLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestore$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithNewLeaderEpochsOnRemoteCluster$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithNewLeaderEpochsOnLocalCluster$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithDivergentRecordsAfterFailover$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithProduceAfterFailover$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreWithDivergentRecordsAndProduceAfterFailover$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testFailoverThenRestoreThenReverse$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseThenFailoverThenRestore$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testFailoverThenRestoreThenReverseTwice$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testReverseTwiceThenFailoverThenRestore$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreFailsOnWrongStoppedMirrorTopic$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreCombinationOfMirrorStates$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreCombinationOfMirrorStates$2(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$testTruncateAndRestoreNoRecordsInTopic$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$1(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ TopicPartition $anonfun$verifyBidirectionalLink$2(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static final /* synthetic */ void $anonfun$configureMirrorTransitionBatchSize$1(int i, ClusterLinkTestHarness clusterLinkTestHarness) {
        clusterLinkTestHarness.alterBrokerConfig(None$.MODULE$, "confluent.cluster.link.mirror.transition.batch.size", Integer.toString(i));
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$3(MetricName metricName) {
        return metricName.group().contains("cluster-link") && metricName.tags().containsKey("mode");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$4(MetricName metricName) {
        String name = metricName.name();
        return name == null || !name.equals("active-link-count");
    }

    public static final /* synthetic */ boolean $anonfun$verifyLinkMetrics$5(MetricName metricName) {
        Object obj = metricName.tags().get("mode");
        return (obj == null || !obj.equals("bidirectional")) && metricName.tags().containsKey("link-id") && !metricName.name().startsWith("reverse-connection");
    }

    public static final /* synthetic */ void $anonfun$verifyLinkMetrics$1(BidirectionalLinkIntegrationTest bidirectionalLinkIntegrationTest, ClusterLinkTestHarness clusterLinkTestHarness) {
        bidirectionalLinkIntegrationTest.verifyLinkCountMetric(ClusterLinkConfig.LinkMode.BIDIRECTIONAL, "active", clusterLinkTestHarness);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.BIDIRECTIONAL);
        bidirectionalLinkIntegrationTest.verifyActiveLinkCountMetric(clusterLinkTestHarness, ClusterLinkConfig.LinkMode.SOURCE);
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), ((IterableOnceOps) clusterLinkTestHarness.aliveServers().flatMap(kafkaBroker -> {
            return ((IterableOnceOps) ((IterableOps) ((IterableOps) ((IterableOps) CollectionConverters$.MODULE$.SetHasAsScala(kafkaBroker.metrics().metrics().keySet()).asScala().filter(metricName -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$3(metricName));
            })).filter(metricName2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$4(metricName2));
            })).filter(metricName3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$verifyLinkMetrics$5(metricName3));
            })).map(metricName4 -> {
                return new StringBuilder(1).append(metricName4.name()).append(":").append(metricName4.tags()).toString();
            })).toSet();
        })).toSet());
    }

    public static final /* synthetic */ boolean $anonfun$matchesTruncatedMessageCount$1(long j, Map map) {
        Assertions.assertEquals(1, map.size());
        Tuple2 tuple2 = (Tuple2) map.head();
        if (tuple2 != null) {
            return ((TruncateAndRestoreResult) tuple2._2()).messagesTruncated() == j;
        }
        throw new MatchError((Object) null);
    }

    public static final /* synthetic */ boolean $anonfun$matchesPartitionLevelTruncationData$1(List list, Map map) {
        Assertions.assertEquals(1, map.size());
        long unboxToLong = BoxesRunTime.unboxToLong(list.foldLeft(BoxesRunTime.boxToLong(0L), (j, j2) -> {
            return j + j2;
        }));
        Tuple2 tuple2 = (Tuple2) map.head();
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        TruncateAndRestoreResult truncateAndRestoreResult = (TruncateAndRestoreResult) tuple2._2();
        BooleanRef create = BooleanRef.create(true);
        Assertions.assertEquals(list.count(j3 -> {
            return j3 > 0;
        }), truncateAndRestoreResult.partitionLevelTruncationData().size());
        if (truncateAndRestoreResult.partitionLevelTruncationData() == null) {
            return unboxToLong == 0;
        }
        truncateAndRestoreResult.partitionLevelTruncationData().forEach(partitionTruncationInformation -> {
            if (partitionTruncationInformation.messagesTruncated() != BoxesRunTime.unboxToLong(list.apply(partitionTruncationInformation.partitionId())) || partitionTruncationInformation.messagesTruncated() == 0) {
                create.elem = false;
            }
        });
        return create.elem && truncateAndRestoreResult.messagesTruncated() == unboxToLong;
    }

    public BidirectionalLinkIntegrationTest() {
        useSourceInitiatedLink_$eq(false);
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        Some some = new Some(SecurityProtocol.PLAINTEXT);
        ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
        sourceCluster_$eq(new ClusterLinkTestHarness(securityProtocol, some, 0, 2));
        SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_PLAINTEXT;
        Some some2 = new Some(SecurityProtocol.PLAINTEXT);
        ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
        destCluster_$eq(new ClusterLinkTestHarness(securityProtocol2, some2, 100, 2));
    }
}
