package kafka.link;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.controller.ReplicaAssignment;
import kafka.log.LogConfig$;
import kafka.server.AbstractFetcherManager;
import kafka.server.ConfigType$;
import kafka.server.DynamicConfig$Client$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.ReplicaFetcherThread;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkFilterJson$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.FetchResponseSize;
import kafka.server.link.LinkMode$Destination$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ClusterLinkInUseException;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIntegrationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001\t\rd\u0001B\u001d;\u0001}BQ\u0001\u0012\u0001\u0005\u0002\u0015Cqa\u0012\u0001C\u0002\u0013\u0005\u0001\n\u0003\u0004P\u0001\u0001\u0006I!\u0013\u0005\b!\u0002\u0011\r\u0011\"\u0001I\u0011\u0019\t\u0006\u0001)A\u0005\u0013\"9!\u000b\u0001b\u0001\n\u0003\u0019\u0006B\u0002/\u0001A\u0003%A\u000bC\u0004^\u0001\t\u0007I\u0011\u00010\t\r%\u0004\u0001\u0015!\u0003`\u0011\u001dQ\u0007A1A\u0005\u0002yCaa\u001b\u0001!\u0002\u0013y\u0006\"\u00027\u0001\t\u0003i\u0007\"\u0002@\u0001\t\u0003i\u0007BBA\u0001\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002\u0006\u0001!\t!\u001c\u0005\u0007\u0003\u0013\u0001A\u0011A7\t\r\u00055\u0001\u0001\"\u0001n\u0011\u0019\t\t\u0002\u0001C\u0001[\"1\u0011Q\u0003\u0001\u0005\u00025Da!!\u0007\u0001\t\u0003i\u0007BBA\u000f\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002\"\u0001!\t!\u001c\u0005\u0007\u0003K\u0001A\u0011A7\t\r\u0005%\u0002\u0001\"\u0001n\u0011\u0019\ti\u0003\u0001C\u0001[\"9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0002bBAK\u0001\u0011\u0005\u0011q\u0013\u0005\b\u00037\u0003A\u0011BAO\u0011\u001d\t)\u000b\u0001C\u0005\u0003OCa!!2\u0001\t\u0003i\u0007BBAe\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002N\u0002!\t!\u001c\u0005\u0007\u0003#\u0004A\u0011A7\t\r\u0005U\u0007\u0001\"\u0001n\u0011\u0019\tI\u000e\u0001C\u0001[\"1\u0011Q\u001c\u0001\u0005\u00025Da!!9\u0001\t\u0003i\u0007BBAs\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002j\u0002!\t!\u001c\u0005\u0007\u0003[\u0004A\u0011A7\t\r\u0005E\b\u0001\"\u0001n\u0011\u0019\t)\u0010\u0001C\u0001[\"9\u0011\u0011 \u0001\u0005\n\u0005m\bbBA��\u0001\u0011%!\u0011\u0001\u0005\u0007\u0005\u001b\u0001A\u0011A7\t\r\tE\u0001\u0001\"\u0001n\u0011\u0019\u0011)\u0002\u0001C\u0001[\"1!\u0011\u0004\u0001\u0005\u00025DaA!\b\u0001\t\u0003i\u0007B\u0002B\u0011\u0001\u0011\u0005Q\u000e\u0003\u0004\u0003&\u0001!\t!\u001c\u0005\b\u0005S\u0001A\u0011\u0002B\u0016\u0011%\u0011\t\u0004AI\u0001\n\u0013\u0011\u0019\u0004\u0003\u0004\u0003J\u0001!\t!\u001c\u0005\u0007\u0005\u001b\u0002A\u0011A7\t\r\tE\u0003\u0001\"\u0001n\u0005i\u0019E.^:uKJd\u0015N\\6J]R,wM]1uS>tG+Z:u\u0015\tYD(\u0001\u0003mS:\\'\"A\u001f\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0011\t\u0003\u0003\nk\u0011AO\u0005\u0003\u0007j\u0012!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001G!\t\t\u0005!\u0001\bpM\u001a\u001cX\r\u001e+p\u0007>lW.\u001b;\u0016\u0003%\u0003\"AS'\u000e\u0003-S\u0011\u0001T\u0001\u0006g\u000e\fG.Y\u0005\u0003\u001d.\u0013A\u0001T8oO\u0006yqN\u001a4tKR$vnQ8n[&$\b%\u0001\u0006ts:\u001c\u0007+\u001a:j_\u0012\f1b]=oGB+'/[8eA\u0005i1m\u001c8tk6,'o\u0012:pkB,\u0012\u0001\u0016\t\u0003+jk\u0011A\u0016\u0006\u0003/b\u000bA\u0001\\1oO*\t\u0011,\u0001\u0003kCZ\f\u0017BA.W\u0005\u0019\u0019FO]5oO\u0006q1m\u001c8tk6,'o\u0012:pkB\u0004\u0013a\u0003;pa&\u001cg)\u001b7uKJ,\u0012a\u0018\t\u0003A\u001et!!Y3\u0011\u0005\t\\U\"A2\u000b\u0005\u0011t\u0014A\u0002\u001fs_>$h(\u0003\u0002g\u0017\u00061\u0001K]3eK\u001aL!a\u00175\u000b\u0005\u0019\\\u0015\u0001\u0004;pa&\u001cg)\u001b7uKJ\u0004\u0013AF5oG2,H-Z!mYR{\u0007/[2t\r&dG/\u001a:\u0002/%t7\r\\;eK\u0006cG\u000eV8qS\u000e\u001ch)\u001b7uKJ\u0004\u0013!\u0006;fgR\u001c%/Z1uK6K'O]8s)>\u0004\u0018n\u0019\u000b\u0002]B\u0011!j\\\u0005\u0003a.\u0013A!\u00168ji\"\u0012AB\u001d\t\u0003grl\u0011\u0001\u001e\u0006\u0003kZ\f1!\u00199j\u0015\t9\b0A\u0004kkBLG/\u001a:\u000b\u0005eT\u0018!\u00026v]&$(\"A>\u0002\u0007=\u0014x-\u0003\u0002~i\n!A+Z:u\u0003}!Xm\u001d;Ue\u0006t7/Y2uS>t7oV5uQ6K'O]8s)>\u0004\u0018n\u0019\u0015\u0003\u001bI\fQ\u0005^3tiN#x\u000e]'jeJ|'\u000fV8qS\u000e<\u0016\u000e\u001e5J]Z\fG.\u001b3SKF,Xm\u001d;)\u00059\u0011\u0018A\u0004;fgR\u001cFo\u001c9NSJ\u0014xN\u001d\u0015\u0003\u001fI\fq\u0005^3tiN#x\u000e]'jeJ|'oV5uQN{WO]2f\u00072,8\u000f^3s'\",H\u000fZ8x]\"\u0012\u0001C]\u0001#i\u0016\u001cHo\u0011:fCR,\u0017I\u001c3EK2,G/Z!oIJ+7M]3bi\u0016d\u0015N\\6)\u0005E\u0011\u0018\u0001\u0006;fgRl\u0015N\u001d:pe:+wOU3d_J$7\u000f\u000b\u0002\u0013e\u0006IB/Z:u\u001b&\u0014(o\u001c:Fq&\u001cH/\u001b8h%\u0016\u001cwN\u001d3tQ\t\u0019\"/\u0001\nuKN$X*\u0019=NKN\u001c\u0018mZ3TSj,\u0007F\u0001\u000bs\u0003M!Xm\u001d;U_BL7mQ8oM&<7+\u001f8dQ\t)\"/\u0001\fuKN$H*[:u\t\u0016\u001c8M]5cK6K'O]8sQ\t1\"/\u0001\fuKN$8k\\;sG\u0016\u001cE.^:uKJ\fVo\u001c;bQ\t9\"/A\u0010uKN$H)Z:uS:\fG/[8o\u00072,8\u000f^3s\u0019&t7.U;pi\u0006D#\u0001\u0007:\u0002UQ,7\u000f\u001e#fgRLg.\u0019;j_:\u001cE.^:uKJd\u0015N\\6Ce>\\WM\u001d'fm\u0016d\u0017+^8uC\"\u0012\u0011D]\u0001\"m\u0016\u0014\u0018NZ=EKN$\u0018N\\1uS>t7\t\\;ti\u0016\u0014H*\u001b8l#V|G/\u0019\u000b\u0007\u0003k\t\t%a\u001a\u0011\t\u0005]\u0012QH\u0007\u0003\u0003sQ1!a\u000fY\u0003\u0011)H/\u001b7\n\t\u0005}\u0012\u0011\b\u0002\u0005+VKE\tC\u0004\u0002Di\u0001\r!!\u0012\u0002\u0013I,7o\\;sG\u0016\u001c\bCBA$\u0003\u001b\n\t&\u0004\u0002\u0002J)\u0019\u00111J&\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002P\u0005%#aA*fcB!\u00111KA2\u001b\t\t)F\u0003\u0003\u0002X\u0005e\u0013AB2p]\u001aLwM\u0003\u0003\u0002\\\u0005u\u0013AB2p[6|gNC\u0002>\u0003?R1!!\u0019{\u0003\u0019\t\u0007/Y2iK&!\u0011QMA+\u00059\u0019uN\u001c4jOJ+7o\\;sG\u0016Dq!!\u001b\u001b\u0001\u0004\tY'A\u0005rk>$\u0018-T8eKB!\u0011QNAH\u001d\u0011\ty'!#\u000f\t\u0005E\u0014Q\u0011\b\u0005\u0003g\n\u0019I\u0004\u0003\u0002v\u0005\u0005e\u0002BA<\u0003\u007frA!!\u001f\u0002~9\u0019!-a\u001f\n\u0003mL1!!\u0019{\u0013\ri\u0014qL\u0005\u0005\u00037\ni&\u0003\u0003\u0002X\u0005e\u0013\u0002BAD\u0003+\n\u0011\"\u001b8uKJt\u0017\r\\:\n\t\u0005-\u0015QR\u0001\u0011\u0007>tg\r\\;f]R\u001cuN\u001c4jONTA!a\"\u0002V%!\u0011\u0011SAJ\u0005Q\u0019E.^:uKJd\u0015N\\6Rk>$\u0018-T8eK*!\u00111RAG\u0003=1XM]5gsF+x\u000e^1N_\u0012,Gc\u00018\u0002\u001a\"9\u0011\u0011N\u000eA\u0002\u0005-\u0014\u0001\t3fgR\u001cE.^:uKJd\u0015N\\6SKBd\u0017nY1t)\"\u0014x\u000e\u001e;mK\u0012$\"!a(\u0011\u0007)\u000b\t+C\u0002\u0002$.\u0013qAQ8pY\u0016\fg.A\fwKJLg-\u001f$fi\u000eD'+Z:q_:\u001cXmU5{KR)a.!+\u0002.\"9\u00111V\u000fA\u0002\u0005U\u0012A\u00027j].LE\rC\u0004\u00020v\u0001\r!!-\u0002\u0019\u0015D\b/Z2uK\u0012\u001c\u0016N_3\u0011\u000b)\u000b\u0019,a.\n\u0007\u0005U6J\u0001\u0004PaRLwN\u001c\t\u0005\u0003s\u000b\t-\u0004\u0002\u0002<*\u00191(!0\u000b\u0007\u0005}F(\u0001\u0004tKJ4XM]\u0005\u0005\u0003\u0007\fYLA\tGKR\u001c\u0007NU3ta>t7/Z*ju\u0016\f\u0001\u0007^3ti\u0012+7\u000f^5oCRLwN\\\"mkN$XM\u001d'j].\fVo\u001c;b/&$\bN\u0011:pW\u0016\u0014(+Z:uCJ$\bF\u0001\u0010s\u0003\u0015\"Xm\u001d;EKN$\u0018N\\1uS>tG*Y4MS:\\g)\u001a;dQ\u0016\u0014H\u000b\u001b:piRdW\r\u000b\u0002 e\u0006\tB/Z:u\u0003\u0012$\u0007+\u0019:uSRLwN\\:)\u0005\u0001\u0012\u0018a\u0007;fgR\fE\u000e^3s\u00072,8\u000f^3s\u0019&t7nQ8oM&<7\u000f\u000b\u0002\"e\u0006IC/Z:u\u001f\u001a47/\u001a;NS\u001e\u0014\u0018\r^5p]^KG\u000f[!eI\u0016$7i\u001c8tk6,'o\u0012:pkBD#A\t:\u0002CQ,7\u000f^(gMN,G/T5he\u0006$\u0018n\u001c8XSRD\u0017\t\u001a3fIR{\u0007/[2)\u0005\r\u0012\u0018\u0001\u0005;fgR$Um\u001d;SK\u0006$wJ\u001c7zQ\t!#/\u0001\u000fuKN$H)\u001a7fi\u0016\u001cE.^:uKJd\u0015N\\6DY\u0016\fg.\u001e9)\u0005\u0015\u0012\u0018\u0001\t;fgRl\u0015N\u001d:pe\u0016$Gk\u001c9jG6\u000b'o[3e\r>\u0014H)\u001a7fi\u0016D#A\n:\u0002\u001dQ,7\u000f\u001e)bkN,Gk\u001c9jG\"\u0012qE]\u0001\u0015i\u0016\u001cH\u000fU1vg\u0016\u001cE.^:uKJd\u0015N\\6)\u0005!\u0012\u0018!\u0005;fgR\u0014V\r\u001d7jG\u0006\u001cF/\u0019;vg\"\u0012\u0011F]\u0001\u0012i\u0016\u001cH/Q;u_6K'O]8sS:<\u0007F\u0001\u0016s\u0003=\tW\u000f^8NSJ\u0014xN\u001d+pa&\u001cGc\u00018\u0002~\")\u0001k\u000ba\u0001\u0013\u0006iB-Z:u\u0019&t7\u000e\u0015:paN4uN]!vi>l\u0015N\u001d:pe&tw\r\u0006\u0003\u0003\u0004\t%\u0001\u0003BA\u001c\u0005\u000bIAAa\u0002\u0002:\tQ\u0001K]8qKJ$\u0018.Z:\t\r\t-A\u00061\u0001`\u00031!x\u000e]5d\r&dG/\u001a:t\u0003M\"Xm\u001d;MCN$h)\u001a;dQ\u0016$wJ\u001a4tKR\u0004&o\\7pi\u0016$W*\u001b:s_J$v\u000e]5d\t\u0016\u001c8M]5qi&|g\u000e\u000b\u0002.e\u0006)D/Z:u\u0019\u0006\u001cHOR3uG\",Gm\u00144gg\u0016$h)Y5mK\u0012|e/\u001a:NSJ\u0014xN\u001d+pa&\u001cG)Z:de&\u0004H/[8oQ\tq#/\u0001\u0016uKN$\u0018)\u001e;p\u001b&\u0014(o\u001c:j]\u001etun\u0014<fe2\f\u0007\u000f]5oOR{\u0007/[2GS2$XM]:)\u0005=\u0012\u0018a\n;fgR\fU\u000f^8NSJ\u0014xN]5oO\u0006cGn\\<t\u0019&t7nQ8oM&<W\u000b\u001d3bi\u0016D#\u0001\r:\u0002GQ,7\u000f^!vi>l\u0015N\u001d:pe&tw-\u00169eCR,W\t_5ti&tw\rT5oW\"\u0012\u0011G]\u0001'i\u0016\u001cH/Q;u_6K'O]8sS:<\u0017\t\u001a3j]\u001e\fE\rZ5uS>t\u0017\r\u001c+pa&\u001c\u0007F\u0001\u001as\u0003\u0001\"Xm\u001d;BkR|W*\u001b:s_JLgn\u001a(p\u000bbL7\u000f^5oOR{\u0007/[2)\u0005M\u0012\u0018A\r;fgRd\u0015m\u001d;GKR\u001c\u0007.\u001a3PM\u001a\u001cX\r^*u_B\u0004X\rZ'jeJ|'\u000fV8qS\u000e$Um]2sSB$\u0018n\u001c8\u0015\u00079\u0014i\u0003C\u0005\u00030Q\u0002\n\u00111\u0001\u0002 \u00069\u0001O]8n_R,\u0017\u0001\u0010;fgRd\u0015m\u001d;GKR\u001c\u0007.\u001a3PM\u001a\u001cX\r^*u_B\u0004X\rZ'jeJ|'\u000fV8qS\u000e$Um]2sSB$\u0018n\u001c8%I\u00164\u0017-\u001e7uIE*\"A!\u000e+\t\u0005}%qG\u0016\u0003\u0005s\u0001BAa\u000f\u0003F5\u0011!Q\b\u0006\u0005\u0005\u007f\u0011\t%A\u0005v]\u000eDWmY6fI*\u0019!1I&\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0003H\tu\"!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006aB/Z:u\t\u0016dW\r^3BkR|W*\u001b:s_J,G\rV8qS\u000e\u001c\bF\u0001\u001cs\u0003\t\"Xm\u001d;J]R,'O^1m\u0007\"\fgnZ3G_J\u0004VM]5pI&\u001cG+Y:lg\"\u0012qG]\u0001*i\u0016\u001cH/T5se>\u0014h)Y5m_Z,'o\u00165f]N{WO]2f\u0013N,f.\u0019<bS2\f'\r\\3)\u0005a\u0012\bf\u0002\u0001\u0003X\tu#q\f\t\u0004g\ne\u0013b\u0001B.i\n\u0019A+Y4\u0002\u000bY\fG.^3\"\u0005\t\u0005\u0014aC5oi\u0016<'/\u0019;j_:\u0004")
/* loaded from: input_file:kafka/link/ClusterLinkIntegrationTest.class */
public class ClusterLinkIntegrationTest extends AbstractClusterLinkIntegrationTest {
    private final long offsetToCommit = 10;
    private final long syncPeriod = 100;
    private final String consumerGroup = "testGroup";
    private final String topicFilter = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(181).append("|{\n        |\"topicFilters\": [\n        |  {\n        |     \"name\": \"").append(topic()).append("\",\n        |     \"patternType\": \"literal\",\n        |     \"filterType\": \"include\"\n        |  }\n        |]}\n        |").toString())).stripMargin();
    private final String includeAllTopicsFilter = new StringOps(Predef$.MODULE$.augmentString("|{\n        |\"topicFilters\": [\n        |  {\n        |     \"name\": \"*\",\n        |     \"patternType\": \"literal\",\n        |     \"filterType\": \"include\"\n        |  }\n        |]}\n        |")).stripMargin();

    public long offsetToCommit() {
        return this.offsetToCommit;
    }

    public long syncPeriod() {
        return this.syncPeriod;
    }

    public String consumerGroup() {
        return this.consumerGroup;
    }

    public String topicFilter() {
        return this.topicFilter;
    }

    public String includeAllTopicsFilter() {
        return this.includeAllTopicsFilter;
    }

    @Test
    public void testCreateMirrorTopic() {
        Properties properties = new Properties();
        properties.put(LogConfig$.MODULE$.RetentionMsProp(), "10000");
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), properties);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), true);
        CreateTopicsResult linkTopic = destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        Assertions.assertEquals(numPartitions(), (Integer) linkTopic.numPartitions(topic()).get());
        Assertions.assertEquals(replicationFactor(), (Integer) linkTopic.replicationFactor(topic()).get());
        Assertions.assertEquals("10000", ((Config) linkTopic.config(topic()).get()).get(LogConfig$.MODULE$.RetentionMsProp()).value());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateMirrorTopic$1(this, clusterLinkListing));
        })).head()).topics().get()).asScala()).toSet());
    }

    @Test
    public void testTransactionsWithMirrorTopic() {
        String str = "anotherTopic";
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        destCluster().createTopic("anotherTopic", numPartitions(), replicationFactor(), destCluster().createTopic$default$4());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), true);
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        Properties properties = new Properties();
        properties.setProperty("transactional.id", "test_txn");
        properties.setProperty("acks", "all");
        KafkaProducer<byte[], byte[]> createProducer = destCluster().createProducer(destCluster().createProducer$default$1(), destCluster().createProducer$default$2(), properties);
        createProducer.initTransactions();
        Properties properties2 = new Properties();
        properties2.setProperty("group.id", "testGroup");
        properties2.setProperty("isolation.level", "read_committed");
        KafkaConsumer<byte[], byte[]> createConsumer = destCluster().createConsumer(destCluster().createConsumer$default$1(), destCluster().createConsumer$default$2(), properties2, destCluster().createConsumer$default$4());
        createConsumer.assign((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions()).asJava());
        Seq consumeRecords = TestUtils$.MODULE$.consumeRecords(createConsumer, producedRecords().size(), 20000L);
        Map map = (Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter(createConsumer.endOffsets((Collection) CollectionConverters$.MODULE$.seqAsJavaListConverter(partitions()).asJava())).asScala()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            return new Tuple2((TopicPartition) tuple2._1(), new OffsetAndMetadata(Predef$.MODULE$.Long2long((Long) tuple2._2())));
        }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
        createProducer.beginTransaction();
        consumeRecords.foreach(consumerRecord -> {
            return createProducer.send(new ProducerRecord(str, Predef$.MODULE$.int2Integer(consumerRecord.partition()), Predef$.MODULE$.long2Long(consumerRecord.timestamp()), consumerRecord.key(), consumerRecord.value()));
        });
        createProducer.sendOffsetsToTransaction(map, new ConsumerGroupMetadata("testGroup"));
        createProducer.commitTransaction();
        Assertions.assertEquals(map, createConsumer.committed((Set) CollectionConverters$.MODULE$.setAsJavaSetConverter(partitions().toSet()).asJava()));
        createProducer.beginTransaction();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            this.produceRecords(createProducer, this.topic(), 10, this.produceRecords$default$4());
        });
        Assertions.assertTrue(executionException.getMessage().matches(".*Could not add partitions to transaction due to errors.*INVALID_REQUEST.*"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
        createProducer.abortTransaction();
        producedRecords().clear();
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTransactionsWithMirrorTopic$4(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTransactionsWithMirrorTopic$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        createProducer.beginTransaction();
        produceRecords(createProducer, topic(), 10, produceRecords$default$4());
        createProducer.commitTransaction();
        createProducer.close();
        consumeRecords(createConsumer);
    }

    @Test
    public void testStopMirrorTopicWithInvalidRequest() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(100);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            this.destCluster().unlinkTopic(this.topic(), this.linkName(), false, false);
        });
        destCluster().createTopic(topic(), numPartitions(), replicationFactor(), destCluster().createTopic$default$4());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().unlinkTopic(this.topic(), this.linkName(), false, false);
        });
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().unlinkTopic(this.topic(), this.linkName(), false, false);
        });
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testStopMirror() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(100);
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testStopMirrorWithSourceClusterShutdown() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(100);
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        sourceCluster().killAllBrokers();
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        restartSource$1();
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        Thread.sleep(1000L);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        restartSource$1();
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        restartSource$1();
        restartMirrorTopic$1();
        sourceCluster().killAllBrokers();
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        Thread.sleep(1000L);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PENDING_STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testCreateAndDeleteAndRecreateLink() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$1(this, clusterLinkListing));
        })).head()).topics().get()).asScala()).toSet());
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        Assertions.assertTrue(destCluster().listClusterLinks(true).isEmpty());
        if (useSourceInitiatedLink()) {
            sourceCluster().deleteClusterLink(linkName(), sourceCluster().deleteClusterLink$default$2(), sourceCluster().deleteClusterLink$default$3());
            Assertions.assertTrue(sourceCluster().listClusterLinks(true).isEmpty());
        }
        UUID createClusterLink2 = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), ((TraversableOnce) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection) ((ClusterLinkListing) ((IterableLike) destCluster().listClusterLinks(true).filter(clusterLinkListing2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testCreateAndDeleteAndRecreateLink$2(this, clusterLinkListing2));
        })).head()).topics().get()).asScala()).toSet());
        produceToSourceCluster(20);
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink2);
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        if (useSourceInitiatedLink()) {
            return;
        }
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("sasl.jaas.config", destLinkProps.getProperty("sasl.jaas.config").replace(linkName(), new StringBuilder(6).append("wrong-").append(linkName()).toString()));
        verifyValidateLinkFailure(destLinkProps, SaslAuthenticationException.class, "Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256");
    }

    @Test
    public void testMirrorNewRecords() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(20);
        consume(sourceCluster());
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
        String property = connectingCluster().adminZkClient().fetchClusterLinkConfig(createClusterLink).getProperty("sasl.jaas.config");
        Assertions.assertNotNull(property);
        Assertions.assertFalse(property.contains("secret-"), new StringBuilder(24).append("Password not encrypted: ").append(property).toString());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testMirrorExistingRecords() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        waitAndVerifyMetricsAndMirror(topic(), createClusterLink);
    }

    @Test
    public void testMaxMessageSize() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        KafkaProducer<byte[], byte[]> createProducer = sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4());
        createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.MaxMessageSizeProp()), "1000")})));
        produceRecords(createProducer, topic(), 20, produceRecords$default$4());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        createProducer.send(new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(System.currentTimeMillis()), new StringBuilder(4).append("key ").append(nextProduceIndex()).toString().getBytes(), TestUtils.randomBytes(1100)));
        waitForFailure(destCluster().createAdminClient(destCluster().createAdminClient$default$1()), FailureType$RecordTooLarge$.MODULE$);
        destCluster().deleteTopic(topic(), true);
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        createProducer.close(Duration.ZERO);
    }

    @Test
    public void testTopicConfigSync() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testTopicConfigSync$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testTopicConfigSync$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyTopicConfigChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testListDescribeMirror() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), sourceCluster().listMirrorTopics(sourceCluster().listMirrorTopics$default$1()));
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()));
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(topic());
        Assertions.assertEquals(linkName(), describeMirrorTopic.linkName());
        Assertions.assertEquals(topic(), describeMirrorTopic.mirrorTopic());
        Assertions.assertEquals(MirrorTopicDescription.State.ACTIVE, describeMirrorTopic.state());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, waitUntilMirrorState$default$2());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        Assertions.assertEquals(destCluster().describeMirrorTopic(topic()).state(), MirrorTopicDescription.State.PENDING_STOPPED);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testListDescribeMirror$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testListDescribeMirror$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()));
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), destCluster().listMirrorTopics(true));
        Assertions.assertEquals(destCluster().describeMirrorTopic(topic()).state(), MirrorTopicDescription.State.STOPPED);
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), destCluster().listMirrorTopics(true));
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            this.destCluster().describeMirrorTopic(this.topic());
        });
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        Assertions.assertEquals(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()})), destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()));
        Assertions.assertEquals(MirrorTopicDescription.State.ACTIVE, destCluster().describeMirrorTopic(topic()).state());
        destCluster().pauseTopic(topic(), true);
        Assertions.assertEquals(MirrorTopicDescription.State.PAUSED, destCluster().describeMirrorTopic(topic()).state());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        destCluster().deleteClusterLink(linkName(), true, destCluster().deleteClusterLink$default$3());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testListDescribeMirror$4(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testListDescribeMirror$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    @Test
    public void testSourceClusterQuota() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KafkaConfig$.MODULE$.ReplicaFetchMaxBytesProp()), "100")}))), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        ConfluentAdmin createAdminClient = sourceCluster().createAdminClient(sourceCluster().createAdminClient$default$1());
        verifyQuota(j -> {
            this.setQuota$1(j, createAdminClient);
        }, () -> {
            return this.throttled$1();
        }, false);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
    }

    @Test
    public void testDestinationClusterLinkQuota() {
        $colon.colon colonVar = new $colon.colon(new ConfigResource(ConfigResource.Type.BROKER, ""), Nil$.MODULE$);
        UUID verifyDestinationClusterLinkQuota = verifyDestinationClusterLinkQuota(colonVar, ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND);
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, None$.MODULE$);
        ConfluentAdmin createAdminClient = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.fetch.response.total.bytes", "10000"), AlterConfigOp.OpType.SET);
        createAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) colonVar.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
        verifyFetchResponseSize(verifyDestinationClusterLinkQuota, new Some(new FetchResponseSize(5000, 10000)));
    }

    @Test
    public void testDestinationClusterLinkBrokerLevelQuota() {
        verifyDestinationClusterLinkQuota(((SeqLike) ((TraversableLike) destCluster().servers().map(kafkaServer -> {
            return BoxesRunTime.boxToInteger($anonfun$testDestinationClusterLinkBrokerLevelQuota$1(kafkaServer));
        }, Buffer$.MODULE$.canBuildFrom())).map(obj -> {
            return $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(BoxesRunTime.unboxToInt(obj));
        }, Buffer$.MODULE$.canBuildFrom())).toSeq(), ConfluentConfigs.ClusterLinkQuotaMode.CLUSTER_LINK_ONLY);
    }

    public UUID verifyDestinationClusterLinkQuota(Seq<ConfigResource> seq, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        ConfluentAdmin createAdminClient = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET);
        createAdminClient.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
        verifyQuota(j -> {
            setQuota$2(j, seq, createAdminClient);
        }, () -> {
            return this.destClusterLinkReplicasThrottled();
        }, true);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyQuotaMode(clusterLinkQuotaMode);
        return createClusterLink;
    }

    public void verifyQuotaMode(ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        sourceCluster().createAdminClient(sourceCluster().createAdminClient$default$1()).incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry("confluent.cluster.link.replication.quota.mode", clusterLinkQuotaMode.toString()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("producer_byte_rate", "100000"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        KafkaServer partitionLeader = sourceCluster().partitionLeader(new TopicPartition(topic(), 0));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$verifyQuotaMode$1(partitionLeader)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$verifyQuotaMode$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$verifyQuotaMode$3(partitionLeader, clusterLinkQuotaMode)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$verifyQuotaMode$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        produceToSourceCluster(20);
        verifyClusterLinkQuotaMetrics(new $colon.colon(partitionLeader, Nil$.MODULE$), clusterLinkQuotaMode.equals(ConfluentConfigs.ClusterLinkQuotaMode.TOTAL_INBOUND));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean destClusterLinkReplicasThrottled() {
        return yammerMetricMaxValue("kafka.server:type=ReplicaManager,name=ThrottledClusterLinkReplicasPerSec", None$.MODULE$) > 0.0d;
    }

    private void verifyFetchResponseSize(UUID uuid, Option<FetchResponseSize> option) {
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) ((IterableLike) ((TraversableLike) destCluster().servers().map(kafkaServer -> {
            return (ClusterLinkFetcherManager) kafkaServer.clusterLinkManager().fetcherManager(uuid).get();
        }, Buffer$.MODULE$.canBuildFrom())).filter(clusterLinkFetcherManager2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyFetchResponseSize$2(clusterLinkFetcherManager2));
        })).head();
        ClusterLinkFetcherThread clusterLinkFetcherThread = (ClusterLinkFetcherThread) ((HashMap) TestUtils.fieldValue(clusterLinkFetcherManager, AbstractFetcherManager.class, "fetcherThreadMap")).values().head();
        Object orElse = option.map(fetchResponseSize -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize.perPartitionSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchMaxBytes();
        });
        Object orElse2 = option.map(fetchResponseSize2 -> {
            return BoxesRunTime.boxToInteger(fetchResponseSize2.responseSize());
        }).getOrElse(() -> {
            return clusterLinkFetcherManager.currentConfig().replicaFetchResponseMaxBytes();
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int fetchSize$1 = fetchSize$1(clusterLinkFetcherThread);
            Integer boxToInteger = BoxesRunTime.boxToInteger(fetchSize$1);
            if ($anonfun$verifyFetchResponseSize$8(orElse, fetchSize$1)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        Assertions.assertEquals(orElse, BoxesRunTime.boxToInteger(fetchSize$1(clusterLinkFetcherThread)));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long computeUntilTrue$default$22 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$32 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            int fetchResponseSize$1 = fetchResponseSize$1(clusterLinkFetcherThread);
            Integer boxToInteger2 = BoxesRunTime.boxToInteger(fetchResponseSize$1);
            if ($anonfun$verifyFetchResponseSize$10(orElse2, fetchResponseSize$1)) {
                Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + computeUntilTrue$default$22) {
                    Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger2), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$22), computeUntilTrue$default$32));
            }
        }
        Assertions.assertEquals(orElse2, BoxesRunTime.boxToInteger(fetchResponseSize$1(clusterLinkFetcherThread)));
    }

    @Test
    public void testDestinationClusterLinkQuotaWithBrokerRestart() {
        Assumptions.assumeFalse(useSourceInitiatedLink());
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        ConfluentAdmin createAdminClient = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        createAdminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), "100"), AlterConfigOp.OpType.SET)}))).asJavaCollection())).all().get();
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Tuple2<Object, Object> shutdownLeader = destCluster().shutdownLeader(topicPartition);
        if (shutdownLeader == null) {
            throw new MatchError((Object) null);
        }
        int _1$mcI$sp = shutdownLeader._1$mcI$sp();
        destCluster().waitForLeaderChange(topicPartition, _1$mcI$sp, shutdownLeader._2$mcI$sp());
        destCluster().startBroker(_1$mcI$sp);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        if (testUtils$ == null) {
            throw null;
        }
        LongRef create = LongRef.create(1L);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(createAdminClient, topicPartition);
                TestUtils$ testUtils$2 = TestUtils$.MODULE$;
                long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
                long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
                if (testUtils$2 == null) {
                    throw null;
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                while (!$anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$2(this, topicPartition, _1$mcI$sp)) {
                    if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$3) {
                        Assertions.fail($anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3());
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
                }
                destCluster().updateBootstrapServers();
                produceUntil(sourceCluster().createProducer(sourceCluster().createProducer$default$1(), sourceCluster().createProducer$default$2(), sourceCluster().createProducer$default$3()), () -> {
                    return this.destClusterLinkReplicasThrottled();
                }, "Destination quota not applied after broker restart");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 15000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(testUtils$.msgWithLogIdent(TestUtils$.$anonfun$retry$1(create)));
                }
                Thread.sleep(create.elem);
                create.elem += package$.MODULE$.min(create.elem, 1000L);
            }
        }
    }

    @Test
    public void testDestinationLagLinkFetcherThrottle() {
        numPartitions_$eq(2);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-2")})), destCluster().alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()) == 0.0d);
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "-1")})), destCluster().alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()) == 0.0d);
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "10485760")})), destCluster().alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()) == 0.0d);
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkFetcherFlowControlProp()), "0")})), destCluster().alterClusterLink$default$3());
        produceToSourceCluster(30);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertTrue(totalKafkaMetricValue(destCluster().aliveServers(), "destination-lag-link-fetcher-throttle-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()) > 0.0d);
        Assertions.assertEquals(2.0d, totalKafkaMetricValue(destCluster().aliveServers(), "link-fetcher-count", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()));
    }

    @Test
    public void testAddPartitions() {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "1000")}))), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        numPartitions_$eq(4);
        sourceCluster().createPartitions(topic(), numPartitions());
        produceToSourceCluster(8);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testAddPartitions$1 = $anonfun$testAddPartitions$1(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testAddPartitions$1);
            if ($anonfun$testAddPartitions$2(this, $anonfun$testAddPartitions$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(numPartitions(), $minus$greater$extension._1$mcI$sp());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAddPartitionMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testAlterClusterLinkConfigs() {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(8);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), sourceLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ClusterLinkTestHarness connectingCluster = super.connectingCluster();
        ClusterLinkTestHarness destCluster = destCluster();
        ClusterLinkTestHarness destCluster2 = (connectingCluster != null ? !connectingCluster.equals(destCluster) : destCluster != null) ? destCluster() : sourceCluster();
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "60000")})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertEquals("60000", connectingCluster.describeClusterLink(linkName()).get("metadata.max.age.ms").value());
        destCluster2.servers().foreach(kafkaServer -> {
            kafkaServer.shutdown();
            return BoxedUnit.UNIT;
        });
        destCluster2.servers().foreach(kafkaServer2 -> {
            kafkaServer2.startup();
            return BoxedUnit.UNIT;
        });
        destCluster2.updateBootstrapServers();
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), destCluster2.brokerList())})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.LinkModeProp()), "SOURCE")})), this.destCluster().alterClusterLink$default$3());
        });
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            connectingCluster.alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConnectionModeProp()), "INBOUND")})), connectingCluster.alterClusterLink$default$3());
        });
        Assertions.assertEquals(LinkMode$Destination$.MODULE$, ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().linkMode());
        Assertions.assertEquals(ConnectionMode$Outbound$.MODULE$, ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) connectingCluster.servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().connectionMode());
        String sb = useSourceInitiatedLink() ? new StringBuilder(23).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append("ssl.truststore.location").toString() : "ssl.truststore.location";
        File file = new File(connectingCluster.describeClusterLink(linkName()).get(sb).value());
        File createTempFile = File.createTempFile("truststore", ".jks");
        Files.copy(file.toPath(), createTempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        connectingCluster.alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sb), createTempFile.getAbsolutePath())})), connectingCluster.alterClusterLink$default$3());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        int brokerId = ((KafkaServer) destCluster().servers().head()).config().brokerId();
        destCluster().zkClient().createPartitionReassignment(((TraversableOnce) ((scala.collection.Map) destCluster().zkClient().getPartitionAssignmentForTopics(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topic()}))).apply(topic())).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            return new Tuple2(new TopicPartition(this.topic(), tuple2._1$mcI$sp()), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId, BoxesRunTime.unboxToInt(((ReplicaAssignment) tuple2._2()).replicas().head())})));
        }, scala.collection.Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        Assertions.assertEquals(1, maxFetcherThreadCount(createClusterLink));
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), "3")})), destCluster().alterClusterLink$default$3());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int maxFetcherThreadCount = maxFetcherThreadCount(createClusterLink);
            Integer boxToInteger = BoxesRunTime.boxToInteger(maxFetcherThreadCount);
            if ($anonfun$testAlterClusterLinkConfigs$7(maxFetcherThreadCount)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(3, $minus$greater$extension._1$mcI$sp());
        produceToSourceCluster(8);
        consume(destCluster());
    }

    @Test
    public void testOffsetMigrationWithAddedConsumerGroup() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"groupFilters\": [\n          |  {\n          |     \"name\": \"").append(consumerGroup()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("testGroup2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin();
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(topic(), 0, offsetToCommit(), consumerGroup());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), stripMargin), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))})), destCluster().alterClusterLink$default$3());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), topic(), 0, 20L, "testGroup2");
        verifyOffsetMigration(topic(), 0, 20L, consumerGroup());
        verifyOffsetMigration(topic(), 0, 20L, "testGroup2");
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testOffsetMigrationWithAddedTopic() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        sourceCluster().createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(syncPeriod()));
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        verifyOffsetMigration(topic(), 0, offsetToCommit(), consumerGroup());
        destCluster().linkTopic("linkedTopic2", (short) 2, linkName(), destCluster().linkTopic$default$4());
        commitOffsets(sourceCluster(), topic(), 0, 20L, consumerGroup());
        commitOffsets(sourceCluster(), "linkedTopic2", 0, 20L, consumerGroup());
        verifyOffsetMigration(topic(), 0, 20L, consumerGroup());
        verifyOffsetMigration("linkedTopic2", 0, 20L, consumerGroup());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        verifyConsumerOffsetMigrationMetrics();
        destCluster().unlinkTopic(topic(), linkName(), false, destCluster().unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp()), consumerGroupFilter(consumerGroup()).replaceAll("include", "exclude"))})));
        Properties properties = new Properties();
        properties.setProperty("group.id", consumerGroup());
        KafkaConsumer createConsumer = destCluster().createConsumer(destCluster().createConsumer$default$1(), destCluster().createConsumer$default$2(), properties, destCluster().createConsumer$default$4());
        createConsumer.subscribe(Collections.singleton("linkedTopic2"));
        do {
            createConsumer.poll(Duration.ofMillis(10L));
        } while (createConsumer.assignment().isEmpty());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(() -> {
            return createConsumer.poll(Duration.ofMillis(10L));
        });
        try {
            destCluster().unlinkTopic("linkedTopic2", linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
            waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, "linkedTopic2");
            newSingleThreadExecutor.shutdownNow();
            destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testDestReadOnly() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        KafkaProducer<byte[], byte[]> createProducer = destCluster().createProducer(destCluster().createProducer$default$1(), destCluster().createProducer$default$2(), destCluster().createProducer$default$3());
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(0L), "key".getBytes(), "value".getBytes())).get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertTrue(executionException.getCause() instanceof InvalidRequestException);
        Assertions.assertTrue(executionException.getMessage().contains("Cannot append records to read-only mirror topic"), new StringBuilder(17).append("Unexpected error ").append(executionException.getMessage()).toString());
        Assertions.assertThrows(InvalidPartitionsException.class, () -> {
            this.destCluster().createPartitions(this.topic(), 8);
        });
        destCluster().withAdmin(confluentAdmin -> {
            $anonfun$testDestReadOnly$3(this, confluentAdmin);
            return BoxedUnit.UNIT;
        });
        produceToSourceCluster(4);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        produceRecords(createProducer, topic(), 10, produceRecords$default$4());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testDeleteClusterLinkCleanup() {
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), "10000")}))), createClusterLink$default$3(), createClusterLink$default$4());
        scala.collection.immutable.Set set = ((TraversableOnce) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 5).map(obj -> {
            return $anonfun$testDeleteClusterLinkCleanup$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toSet();
        set.foreach(str -> {
            this.sourceCluster().createTopic(str, this.numPartitions(), this.replicationFactor(), this.sourceCluster().createTopic$default$4());
            return this.destCluster().linkTopic(str, this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        });
        Assertions.assertEquals(set.size(), destCluster().zkClient().getClusterLinkForTopics(set).size());
        Assertions.assertEquals(new StringBuilder(46).append("Cluster link '").append(linkName()).append("' with ID '").append(createClusterLink).append("' in used by topics: ").append(set.mkString(", ")).toString(), Assertions.assertThrows(ClusterLinkInUseException.class, () -> {
            this.destCluster().deleteClusterLink(this.linkName(), this.destCluster().deleteClusterLink$default$2(), this.destCluster().deleteClusterLink$default$3());
        }).getMessage());
        destCluster().deleteClusterLink(linkName(), true, destCluster().deleteClusterLink$default$3());
        Assertions.assertTrue(destCluster().zkClient().getClusterLinkForTopics(set).isEmpty());
        Assertions.assertTrue(destCluster().zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), createClusterLink.toString()).isEmpty());
    }

    @Test
    public void testMirroredTopicMarkedForDelete() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(obj -> {
            return $anonfun$testMirroredTopicMarkedForDelete$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
        int _1$mcI$sp = destCluster().shutdownLeader((TopicPartition) indexedSeq.head())._1$mcI$sp();
        Buffer<KafkaServer> buffer = (Buffer) destCluster().servers().filter(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirroredTopicMarkedForDelete$2(_1$mcI$sp, kafkaServer));
        });
        destCluster().deleteTopic(topic(), false);
        buffer.foreach(kafkaServer2 -> {
            $anonfun$testMirroredTopicMarkedForDelete$3(indexedSeq, createClusterLink, kafkaServer2);
            return BoxedUnit.UNIT;
        });
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), buffer);
    }

    @Test
    public void testPauseTopic() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        UUID createClusterLink = createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> {
            this.destCluster().pauseTopic(this.topic(), this.destCluster().pauseTopic$default$2());
        });
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        destCluster().pauseTopic(topic(), destCluster().pauseTopic$default$2());
        destCluster().pauseTopic(topic(), destCluster().pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testPauseTopic$2(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testPauseTopic$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyMirrorTopicCountMetric("mirror-topic-count", (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("state"), "PausedMirror")})), verifyMirrorTopicCountMetric$default$3(), verifyMirrorTopicCountMetric$default$4());
        Seq leaderOffsets$1 = leaderOffsets$1();
        produceToSourceCluster(8);
        Thread.sleep(1000L);
        Assertions.assertEquals(leaderOffsets$1, leaderOffsets$1());
        destCluster().pauseTopic(topic(), false);
        destCluster().pauseTopic(topic(), false);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).foreach$mVc$sp(i -> {
            Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, ((ReplicaStatus.MirrorInfo) ((ReplicaStatus) ((IterableLike) this.destCluster().replicaStatus(this.topic(), i, this.destCluster().replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).mirrorInfo().get()).state());
        });
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            this.destCluster().pauseTopic(this.topic(), this.destCluster().pauseTopic$default$2());
        });
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testPauseClusterLink() {
        Tuple2 $minus$greater$extension;
        Tuple2 $minus$greater$extension2;
        int numPartitions = numPartitions();
        sourceCluster().createTopic(topic(), numPartitions, replicationFactor(), sourceCluster().createTopic$default$4());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.DeleteRetentionMsProp()), "10000")})));
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), "100");
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        UUID createClusterLink = createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(8);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        commitOffsets(sourceCluster(), topic(), 0, 10, consumerGroup());
        verifyOffsetMigration(topic(), 0, 10, consumerGroup());
        verifyConsumerOffsetMigrationMetrics();
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
        int i = numPartitions + 2;
        sourceCluster().createPartitions(topic(), i);
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.DeleteRetentionMsProp()), "20000")})));
        produceToSourceCluster(8);
        commitOffsets(sourceCluster(), topic(), 0, 20, consumerGroup());
        verifyPausedLinkMetrics();
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        destCluster().pauseTopic(topic(), destCluster().pauseTopic$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        destCluster().pauseTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.LINK_PAUSED);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.PAUSED, waitUntilMirrorState$default$2());
        Assertions.assertThrows(ClusterLinkPausedException.class, () -> {
            this.destCluster().linkTopic("paused-topic", this.replicationFactor(), this.linkName(), this.destCluster().linkTopic$default$4());
        });
        Thread.sleep(250L);
        Assertions.assertEquals(numPartitions, destCluster().describeTopic(topic()).partitions().size());
        Assertions.assertEquals("10000", destCluster().describeTopicConfig(topic()).get(LogConfig$.MODULE$.DeleteRetentionMsProp()).value());
        Assertions.assertEquals(10, destCluster().getOffset(topic(), 0, consumerGroup()));
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, waitUntilMirrorState$default$2());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            int $anonfun$testPauseClusterLink$2 = $anonfun$testPauseClusterLink$2(this);
            Integer boxToInteger = BoxesRunTime.boxToInteger($anonfun$testPauseClusterLink$2);
            if ($anonfun$testPauseClusterLink$3(i, $anonfun$testPauseClusterLink$2)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(boxToInteger), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(i, $minus$greater$extension._1$mcI$sp());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long computeUntilTrue$default$22 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$32 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (true) {
            String $anonfun$testPauseClusterLink$4 = $anonfun$testPauseClusterLink$4(this);
            if ($anonfun$testPauseClusterLink$5("20000", $anonfun$testPauseClusterLink$4)) {
                $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$4), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis2 + computeUntilTrue$default$22) {
                    $minus$greater$extension2 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testPauseClusterLink$4), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$22), computeUntilTrue$default$32));
            }
        }
        if ($minus$greater$extension2 == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals("20000", (String) $minus$greater$extension2._1());
        verifyOffsetMigration(topic(), 0, 20, consumerGroup());
        verifyBasicLinkMetrics(createClusterLink, destLinkProps, verifyBasicLinkMetrics$default$3());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testReplicaStatus() {
        boolean z;
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        Seq<ReplicaStatus> replicaStatus = sourceCluster().replicaStatus(topic(), 0, true);
        Assertions.assertEquals(2, replicaStatus.size());
        ReplicaStatus replicaStatus2 = (ReplicaStatus) ((IterableLike) replicaStatus.filter(replicaStatus3 -> {
            return BoxesRunTime.boxToBoolean(replicaStatus3.isLeader());
        })).head();
        Assertions.assertEquals(Optional.empty(), replicaStatus2.linkName());
        Assertions.assertEquals(Optional.empty(), replicaStatus2.mirrorInfo());
        ReplicaStatus replicaStatus4 = (ReplicaStatus) ((IterableLike) replicaStatus.filterNot(replicaStatus5 -> {
            return BoxesRunTime.boxToBoolean(replicaStatus5.isLeader());
        })).head();
        Assertions.assertEquals(Optional.empty(), replicaStatus4.linkName());
        Assertions.assertEquals(Optional.empty(), replicaStatus4.mirrorInfo());
        long milliseconds = Time.SYSTEM.milliseconds();
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReplicaStatus$3(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReplicaStatus$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Seq<ReplicaStatus> replicaStatus6 = destCluster().replicaStatus(topic(), 0, true);
        Assertions.assertEquals(4, replicaStatus6.size());
        ReplicaStatus replicaStatus7 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus8 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$5(replicaStatus8));
        })).head();
        Assertions.assertTrue(replicaStatus7.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus7.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertTrue(milliseconds <= mirrorInfo.lastFetchTimeMs(), new StringBuilder(14).append("Expected: ").append(milliseconds).append(" <= ").append(mirrorInfo.lastFetchTimeMs()).toString());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus9 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$6(replicaStatus9));
        })).head()).mirrorInfo());
        ReplicaStatus replicaStatus10 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus11 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$7(replicaStatus11));
        })).head();
        Assertions.assertEquals(linkName(), replicaStatus10.linkName().get());
        Assertions.assertEquals(Optional.empty(), replicaStatus10.mirrorInfo());
        ReplicaStatus replicaStatus12 = (ReplicaStatus) ((IterableLike) replicaStatus6.filter(replicaStatus13 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$8(replicaStatus13));
        })).head();
        Assertions.assertEquals(linkName(), replicaStatus12.linkName().get());
        Assertions.assertEquals(Optional.empty(), replicaStatus12.mirrorInfo());
        long lastFetchTimeMs = mirrorInfo.lastFetchTimeMs();
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        Seq<ReplicaStatus> replicaStatus14 = destCluster().replicaStatus(topic(), 0, false);
        Assertions.assertEquals(2, replicaStatus14.size());
        ReplicaStatus replicaStatus15 = (ReplicaStatus) ((IterableLike) replicaStatus14.filter(replicaStatus16 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$9(replicaStatus16));
        })).head();
        Assertions.assertTrue(replicaStatus15.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus15.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo2.state());
        Assertions.assertTrue(lastFetchTimeMs <= mirrorInfo2.lastFetchTimeMs(), new StringBuilder(14).append("Expected: ").append(lastFetchTimeMs).append(" <= ").append(mirrorInfo2.lastFetchTimeMs()).toString());
        Assertions.assertEquals(10 * 2, mirrorInfo2.lastFetchSourceHighWatermark());
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) replicaStatus14.filter(replicaStatus17 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$10(replicaStatus17));
        })).head()).mirrorInfo());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        ObjectRef create = ObjectRef.create((Object) null);
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testReplicaStatus$11(this, create)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testReplicaStatus$12());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        ReplicaStatus replicaStatus18 = (ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus19 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$13(replicaStatus19));
        })).head();
        Assertions.assertTrue(replicaStatus18.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo3 = (ReplicaStatus.MirrorInfo) replicaStatus18.mirrorInfo().get();
        ReplicaStatus.MirrorInfo.State state = mirrorInfo3.state();
        ReplicaStatus.MirrorInfo.State state2 = ReplicaStatus.MirrorInfo.State.PENDING_STOPPED;
        if (state != null ? !state.equals(state2) : state2 != null) {
            ReplicaStatus.MirrorInfo.State state3 = mirrorInfo3.state();
            ReplicaStatus.MirrorInfo.State state4 = ReplicaStatus.MirrorInfo.State.STOPPED;
            if (state3 != null ? !state3.equals(state4) : state4 != null) {
                z = false;
                Assertions.assertTrue(z);
                Assertions.assertEquals(-1L, mirrorInfo3.lastFetchTimeMs());
                Assertions.assertEquals(-1L, mirrorInfo3.lastFetchSourceHighWatermark());
                waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
                Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus20 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$14(replicaStatus20));
                })).head()).mirrorInfo());
                destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
            }
        }
        z = true;
        Assertions.assertTrue(z);
        Assertions.assertEquals(-1L, mirrorInfo3.lastFetchTimeMs());
        Assertions.assertEquals(-1L, mirrorInfo3.lastFetchSourceHighWatermark());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(Optional.empty(), ((ReplicaStatus) ((IterableLike) ((Seq) create.elem).filter(replicaStatus202 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaStatus$14(replicaStatus202));
        })).head()).mirrorInfo());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testAutoMirroring() {
        autoMirrorTopic(syncPeriod());
        String sb = new StringBuilder(2).append(topic()).append("-2").toString();
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter())})), destCluster().alterClusterLink$default$3());
        sourceCluster().createTopic(String.valueOf(sb), sourceCluster().createTopic$default$2(), sourceCluster().createTopic$default$3(), sourceCluster().createTopic$default$4());
        waitForAutoMirrorCreation(sb);
        Thread.sleep(syncPeriod() * 5);
        Assertions.assertEquals(0.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-create-failed-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()));
        Assertions.assertEquals(2.0d, totalKafkaMetricValue(destCluster().aliveServers(), "auto-mirror-created-total", totalKafkaMetricValue$default$3(), totalKafkaMetricValue$default$4()));
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(339).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"*\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append(sb).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"exclude\"\n          |  }\n          |]}\n          |").toString())).stripMargin())})), destCluster().alterClusterLink$default$3());
        destCluster().deleteTopic(sb, destCluster().deleteTopic$default$2());
        Thread.sleep(1000L);
        Assertions.assertFalse(destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()).contains(sb));
        String sb2 = new StringBuilder(9).append(topic()).append("-conflict").toString();
        destCluster().createTopic(sb2, destCluster().createTopic$default$2(), destCluster().createTopic$default$3(), destCluster().createTopic$default$4());
        sourceCluster().createTopic(sb2, sourceCluster().createTopic$default$2(), sourceCluster().createTopic$default$3(), sourceCluster().createTopic$default$4());
        destCluster().deleteTopic(sb2, false);
        waitForAutoMirrorCreation(sb2);
        destCluster().unlinkTopic(sb2, linkName(), false, destCluster().unlinkTopic$default$4());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    private void autoMirrorTopic(long j) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(topicFilter());
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), Long.toString(j * 10));
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        verifyBasicLinkMetrics(createClusterLink, verifyBasicLinkMetrics$default$2(), verifyBasicLinkMetrics$default$3());
        verifyAutoMirroringSuccessMetric();
    }

    private Properties destLinkPropsForAutoMirroring(String str) {
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        apply.put(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        apply.put(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), str);
        apply.put("metadata.max.age.ms", Long.toString(syncPeriod()));
        return destLinkProps(apply);
    }

    @Test
    public void testLastFetchedOffsetPromotedMirrorTopicDescription() {
        testLastFetchedOffsetStoppedMirrorTopicDescription(testLastFetchedOffsetStoppedMirrorTopicDescription$default$1());
    }

    @Test
    public void testLastFetchedOffsetFailedOverMirrorTopicDescription() {
        testLastFetchedOffsetStoppedMirrorTopicDescription(false);
    }

    @Test
    public void testAutoMirroringNoOverlappingTopicFilters() {
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(topicFilter());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        String sb = new StringBuilder(2).append(linkName()).append("-2").toString();
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(sb, destLinkPropsForAutoMirroring, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        Properties destLinkPropsForAutoMirroring2 = destLinkPropsForAutoMirroring(includeAllTopicsFilter());
        UUID createClusterLink = createClusterLink(sb, destLinkPropsForAutoMirroring2, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.destCluster().alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), this.topicFilter())})), this.destCluster().alterClusterLink$default$3());
        });
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        if (useSourceInitiatedLink()) {
            sourceCluster().deleteClusterLink(linkName(), sourceCluster().deleteClusterLink$default$2(), sourceCluster().deleteClusterLink$default$3());
            Assertions.assertTrue(sourceCluster().listClusterLinks(sourceCluster().listClusterLinks$default$1()).size() == 1);
        }
        destCluster().alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster().alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster().alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring2, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().alterClusterLink(sb, (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), includeAllTopicsFilter())})), destCluster().alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        destCluster().deleteClusterLink(sb, destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testAutoMirroringAllowsLinkConfigUpdate() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        createClusterLink(linkName(), destLinkPropsForAutoMirroring(topicFilter()), createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), destCluster().alterClusterLink$default$3());
        sourceCluster().createTopic("linkedTopicTwo", numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        ObjectRef create = ObjectRef.create(destLinkPropsForAutoMirroring(topicFilter()));
        String sb = new StringBuilder(2).append(linkName()).append("-2").toString();
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(sb, (Properties) create.elem, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        create.elem = destLinkPropsForAutoMirroring(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(197).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append("linkedTopicTwo").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin());
        createClusterLink(sb, (Properties) create.elem, createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        waitForAutoMirrorCreation("linkedTopicTwo");
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster().alterClusterLink$default$3());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        destCluster().unlinkTopic("linkedTopicTwo", sb, destCluster().unlinkTopic$default$3(), false);
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
        destCluster().deleteClusterLink(sb, destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testAutoMirroringUpdateExistingLink() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        createClusterLink(linkName(), destLinkProps((scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), Long.toString(syncPeriod()))}))), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().createTopic(topic(), numPartitions(), replicationFactor(), destCluster().createTopic$default$4());
        scala.collection.mutable.Map apply = Map$.MODULE$.apply(Nil$.MODULE$);
        apply.put(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        apply.put(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), topicFilter());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.destCluster().alterClusterLink(this.linkName(), apply, this.destCluster().alterClusterLink$default$3());
        });
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().alterClusterLink(linkName(), apply, destCluster().alterClusterLink$default$3());
        waitForAutoMirrorCreation(topic());
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testAutoMirroringAddingAdditionalTopic() {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        createClusterLink(linkName(), destLinkPropsForAutoMirroring(topicFilter()), createClusterLink$default$3(), createClusterLink$default$4());
        waitForAutoMirrorCreation(topic());
        sourceCluster().createTopic("linkedTopic2", numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(338).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"").append(topic()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append("linkedTopic2").append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  }\n          |]}\n          |").toString())).stripMargin())})), destCluster().alterClusterLink$default$3());
        waitForAutoMirrorCreation("linkedTopic2");
        destCluster().unlinkTopic(topic(), linkName(), false, false);
        destCluster().unlinkTopic("linkedTopic2", linkName(), destCluster().unlinkTopic$default$3(), false);
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testAutoMirroringNoExistingTopic() {
        destCluster().createTopic(topic(), destCluster().createTopic$default$2(), destCluster().createTopic$default$3(), destCluster().createTopic$default$4());
        Properties destLinkPropsForAutoMirroring = destLinkPropsForAutoMirroring(topicFilter());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.createClusterLink(this.linkName(), destLinkPropsForAutoMirroring, this.createClusterLink$default$3(), this.createClusterLink$default$4());
        });
        destLinkPropsForAutoMirroring.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        UUID createClusterLink = createClusterLink(linkName(), destLinkPropsForAutoMirroring, createClusterLink$default$3(), createClusterLink$default$4());
        Assertions.assertThrows(InvalidConfigurationException.class, () -> {
            this.destCluster().alterClusterLink(this.linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), this.topicFilter())})), this.destCluster().alterClusterLink$default$3());
        });
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(includeAllTopicsFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster().alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), topicFilter())})), destCluster().alterClusterLink$default$3());
        Assertions.assertEquals(ClusterLinkFilterJson$.MODULE$.parse(topicFilter()), ((ClusterLinkFactory.ConnectionManager) ((KafkaBroker) destCluster().servers().head()).clusterLinkManager().connectionManager(createClusterLink).get()).currentConfig().topicFilters());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    private void testLastFetchedOffsetStoppedMirrorTopicDescription(boolean z) {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        createClusterLink(linkName(), createClusterLink$default$2(), createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        ReplicaStatus replicaStatus = (ReplicaStatus) ((IterableLike) destCluster().replicaStatus(topic(), 0, destCluster().replicaStatus$default$3()).filter(replicaStatus2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(replicaStatus2));
        })).head();
        Assertions.assertTrue(replicaStatus.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo = (ReplicaStatus.MirrorInfo) replicaStatus.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.ACTIVE, mirrorInfo.state());
        Assertions.assertEquals(10, mirrorInfo.lastFetchSourceHighWatermark());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), z);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        ReplicaStatus replicaStatus3 = (ReplicaStatus) ((IterableLike) destCluster().replicaStatus(topic(), 0, destCluster().replicaStatus$default$3()).filter(replicaStatus4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(replicaStatus4));
        })).head();
        Assertions.assertTrue(replicaStatus3.mirrorInfo().isPresent());
        ReplicaStatus.MirrorInfo mirrorInfo2 = (ReplicaStatus.MirrorInfo) replicaStatus3.mirrorInfo().get();
        Assertions.assertEquals(ReplicaStatus.MirrorInfo.State.STOPPED, mirrorInfo2.state());
        Assertions.assertEquals(-1L, mirrorInfo2.lastFetchSourceHighWatermark());
        MirrorTopicDescription describeMirrorTopic = destCluster().describeMirrorTopic(topic());
        Assertions.assertEquals(describeMirrorTopic.state(), MirrorTopicDescription.State.STOPPED);
        Assertions.assertEquals(1, describeMirrorTopic.stoppedLogEndOffsets().size());
        Assertions.assertEquals(10, Predef$.MODULE$.Long2long((Long) describeMirrorTopic.stoppedLogEndOffsets().get(0)));
    }

    private boolean testLastFetchedOffsetStoppedMirrorTopicDescription$default$1() {
        return true;
    }

    @Test
    public void testDeleteAutoMirroredTopics() {
        autoMirrorTopic(syncPeriod());
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            this.destCluster().deleteTopic(this.topic(), false);
        });
        Assertions.assertTrue(destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()).contains(topic()));
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        destCluster().deleteTopic(topic(), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDeleteAutoMirroredTopics$2(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testDeleteAutoMirroredTopics$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForAutoMirrorCreation(topic());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().deleteTopic(topic(), sourceCluster().deleteTopic$default$2());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.FAILED, waitUntilMirrorState$default$2());
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(20);
        waitForAutoMirrorCreation(topic());
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicFiltersProp()), new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(339).append("|{\n          |\"topicFilters\": [\n          |  {\n          |     \"name\": \"*\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"include\"\n          |  },\n          |  {\n          |     \"name\": \"").append(topic()).append("\",\n          |     \"patternType\": \"literal\",\n          |     \"filterType\": \"exclude\"\n          |  }\n          |]}\n          |").toString())).stripMargin()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))})), destCluster().alterClusterLink$default$3());
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        Assertions.assertFalse(destCluster().listMirrorTopics(destCluster().listMirrorTopics$default$1()).contains(topic()));
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testIntervalChangeForPeriodicTasks() {
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(consumerGroup()));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp(), String.valueOf(300000));
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), includeAllTopicsFilter());
        destLinkProps.setProperty("metadata.max.age.ms", String.valueOf(300000));
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), String.valueOf(syncPeriod()))})));
        sourceCluster().createTopic(topic(), numPartitions(), 2, sourceCluster().createTopic$default$4());
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        sourceCluster().alterTopic(topic(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("delete.retention.ms"), "80000000")})));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testIntervalChangeForPeriodicTasks$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testIntervalChangeForPeriodicTasks$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        verifyTopicConfigChangeMetrics();
        commitOffsets(sourceCluster(), topic(), 0, offsetToCommit(), consumerGroup());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncMsProp()), String.valueOf(syncPeriod()))})));
        verifyOffsetMigration(topic(), 0, offsetToCommit(), consumerGroup());
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.max.age.ms"), String.valueOf(syncPeriod()))})));
        waitForAutoMirrorCreation(topic());
        alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), "false")})));
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), destCluster().unlinkTopic$default$4());
        destCluster().deleteClusterLink(linkName(), destCluster().deleteClusterLink$default$2(), destCluster().deleteClusterLink$default$3());
    }

    @Test
    public void testMirrorFailoverWhenSourceIsUnavailable() {
        numPartitions_$eq(1);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        Properties destLinkProps = destLinkProps(destLinkProps$default$1());
        destLinkProps.setProperty("metadata.max.age.ms", "100");
        destLinkProps.setProperty("request.timeout.ms", "1000");
        destLinkProps.setProperty("default.api.timeout.ms", "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.ReverseConnectionSetupTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.RetryTimeoutMsProp(), "1000");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        destLinkProps.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckConsecutiveFailureThresholdProp(), "2");
        createClusterLink(linkName(), destLinkProps, createClusterLink$default$3(), createClusterLink$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirrorFailoverWhenSourceIsUnavailable$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.ACTIVE, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE);
        updateCredentials(useSourceInitiatedLink() ? destCluster() : sourceCluster());
        waitUntilOneOfMirrorStates((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.SOURCE_AUTHENTICATION_FAILED, ReplicaStatus.MirrorInfo.State.SOURCE_UNAVAILABLE})));
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.SOURCE_UNAVAILABLE);
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        waitUntilMirrorState(ReplicaStatus.MirrorInfo.State.STOPPED, waitUntilMirrorState$default$2());
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.STOPPED);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateMirrorTopic$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testTransactionsWithMirrorTopic$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return ((SeqLike) clusterLinkIntegrationTest.partitions().flatMap(topicPartition -> {
            return (Buffer) ((TraversableLike) clusterLinkIntegrationTest.destCluster().servers().flatMap(kafkaServer -> {
                return Option$.MODULE$.option2Iterable(kafkaServer.replicaManager().onlinePartition(topicPartition));
            }, Buffer$.MODULE$.canBuildFrom())).filter(partition -> {
                return BoxesRunTime.boxToBoolean(partition.linkedUpdatesOnly());
            });
        }, Seq$.MODULE$.canBuildFrom())).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testTransactionsWithMirrorTopic$8() {
        return "Mirror not stopped";
    }

    private final void restartMirrorTopic$1() {
        destCluster().deleteTopic(topic(), destCluster().deleteTopic$default$2());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
    }

    private final void restartSource$1() {
        sourceCluster().restartDeadBrokers(sourceCluster().restartDeadBrokers$default$1());
        sourceCluster().updateBootstrapServers();
        if (useSourceInitiatedLink()) {
            return;
        }
        destCluster().alterClusterLink(linkName(), (scala.collection.Map) scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), sourceCluster().brokerList())})), destCluster().alterClusterLink$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testCreateAndDeleteAndRecreateLink$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ClusterLinkListing clusterLinkListing) {
        String linkName = clusterLinkListing.linkName();
        String linkName2 = clusterLinkIntegrationTest.linkName();
        return linkName == null ? linkName2 == null : linkName.equals(linkName2);
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSync$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(clusterLinkIntegrationTest.topic()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSync$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeMirror$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        MirrorTopicDescription.State state = clusterLinkIntegrationTest.destCluster().describeMirrorTopic(clusterLinkIntegrationTest.topic()).state();
        MirrorTopicDescription.State state2 = MirrorTopicDescription.State.STOPPED;
        return state == null ? state2 == null : state.equals(state2);
    }

    public static final /* synthetic */ String $anonfun$testListDescribeMirror$2() {
        return "Mirror took too long to stop.";
    }

    public static final /* synthetic */ boolean $anonfun$testListDescribeMirror$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().listMirrorTopics(true).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testListDescribeMirror$5() {
        return "Mirror state not removed";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void setQuota$1(long j, ConfluentAdmin confluentAdmin) {
        confluentAdmin.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(new ClientQuotaEntity((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("user"), linkUserName(linkName()))}))).asJava()), Collections.singleton(new ClientQuotaAlteration.Op(DynamicConfig$Client$.MODULE$.ConsumerByteRateOverrideProp(), Predef$.MODULE$.double2Double(j)))))).all().get(15L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean throttled$1() {
        return kafkaMetricMaxValue(destCluster().servers(), "fetch-throttle-time-max", "cluster-link", new Some(linkName()), kafkaMetricMaxValue$default$5(), kafkaMetricMaxValue$default$6()) > 0.0d;
    }

    public static final /* synthetic */ int $anonfun$testDestinationClusterLinkBrokerLevelQuota$1(KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId();
    }

    public static final /* synthetic */ ConfigResource $anonfun$testDestinationClusterLinkBrokerLevelQuota$2(int i) {
        return new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(i));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void setQuota$2(long j, Seq seq, ConfluentAdmin confluentAdmin) {
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry(KafkaConfig$.MODULE$.ClusterLinkIoMaxBytesPerSecondProp(), Long.toString(j)), AlterConfigOp.OpType.SET);
        confluentAdmin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) seq.map(configResource -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), CollectionConverters$.MODULE$.asJavaCollectionConverter(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new AlterConfigOp[]{alterConfigOp}))).asJavaCollection());
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava()).all().get();
    }

    public static final /* synthetic */ boolean $anonfun$verifyQuotaMode$1(KafkaServer kafkaServer) {
        return kafkaServer.quotaManagers().produce().quotasEnabled();
    }

    public static final /* synthetic */ String $anonfun$verifyQuotaMode$2() {
        return "Produce quota not enabled";
    }

    public static final /* synthetic */ boolean $anonfun$verifyQuotaMode$3(KafkaServer kafkaServer, ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode) {
        ConfluentConfigs.ClusterLinkQuotaMode clusterLinkQuotaMode2 = kafkaServer.config().clusterLinkQuotaMode();
        return clusterLinkQuotaMode2 == null ? clusterLinkQuotaMode == null : clusterLinkQuotaMode2.equals(clusterLinkQuotaMode);
    }

    public static final /* synthetic */ String $anonfun$verifyQuotaMode$4() {
        return "Quota mode not updated";
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$2(ClusterLinkFetcherManager clusterLinkFetcherManager) {
        return clusterLinkFetcherManager.fetcherThreadCount() > 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchSize$1(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue(clusterLinkFetcherThread, ReplicaFetcherThread.class, "fetchSize"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final int fetchResponseSize$1(ClusterLinkFetcherThread clusterLinkFetcherThread) {
        return BoxesRunTime.unboxToInt(TestUtils.fieldValue(clusterLinkFetcherThread, ClusterLinkFetcherThread.class, "fetchResponseSize"));
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$8(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }

    public static final /* synthetic */ boolean $anonfun$verifyFetchResponseSize$10(Object obj, int i) {
        return BoxesRunTime.equals(BoxesRunTime.boxToInteger(i), obj);
    }

    public static final /* synthetic */ void $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$1(ConfluentAdmin confluentAdmin, TopicPartition topicPartition) {
        try {
            confluentAdmin.electLeaders(ElectionType.PREFERRED, Collections.singleton(topicPartition)).all().get(15L, TimeUnit.SECONDS);
        } catch (Throwable unused) {
            Assertions.fail("Preferred leader election failed");
        }
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, TopicPartition topicPartition, int i) {
        Option apply = Option$.MODULE$.apply(clusterLinkIntegrationTest.destCluster().partitionLeader(topicPartition));
        Option<KafkaServer> serverForId = clusterLinkIntegrationTest.destCluster().serverForId(i);
        return apply == null ? serverForId == null : apply.equals(serverForId);
    }

    public static final /* synthetic */ String $anonfun$testDestinationClusterLinkQuotaWithBrokerRestart$3() {
        return "Preferred leader not elected";
    }

    public static final /* synthetic */ int $anonfun$testAddPartitions$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopic(clusterLinkIntegrationTest.topic()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testAddPartitions$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return i == clusterLinkIntegrationTest.numPartitions();
    }

    public static final /* synthetic */ boolean $anonfun$testAlterClusterLinkConfigs$7(int i) {
        return i == 3;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18 */
    /* JADX WARN: Type inference failed for: r0v29, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v36 */
    /* JADX WARN: Type inference failed for: r0v37 */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$4(ConfluentAdmin confluentAdmin, ConfigResource configResource, Tuple2 tuple2) {
        AlterConfigOp alterConfigOp;
        ExecutionException executionException;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        String str = (String) tuple2._1();
        Some some = (Option) tuple2._2();
        String UncleanLeaderElectionEnableProp = LogConfig$.MODULE$.UncleanLeaderElectionEnableProp();
        boolean z = str != null ? str.equals(UncleanLeaderElectionEnableProp) : UncleanLeaderElectionEnableProp == null;
        if (some instanceof Some) {
            AlterConfigOp alterConfigOp2 = new AlterConfigOp(new ConfigEntry(str, (String) some.value()), AlterConfigOp.OpType.SET);
            alterConfigOp = alterConfigOp2;
            executionException = alterConfigOp2;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            AlterConfigOp alterConfigOp3 = new AlterConfigOp(new ConfigEntry(str, (String) null), AlterConfigOp.OpType.DELETE);
            alterConfigOp = alterConfigOp3;
            executionException = alterConfigOp3;
        }
        try {
            confluentAdmin.incrementalAlterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(configResource), Collections.singleton(alterConfigOp))}))).asJava()).all().get();
            executionException = z;
            Assertions.assertTrue((boolean) executionException);
        } catch (ExecutionException unused) {
            Assertions.assertTrue(executionException.getCause() instanceof InvalidConfigurationException);
            Assertions.assertFalse(z);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.kafka.common.config.ConfigResource, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Object] */
    public static final /* synthetic */ void $anonfun$testDestReadOnly$3(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ConfluentAdmin confluentAdmin) {
        ExecutionException configResource = new ConfigResource(ConfigResource.Type.TOPIC, clusterLinkIntegrationTest.topic());
        try {
            confluentAdmin.alterConfigs((Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object) configResource), new Config(CollectionConverters$.MODULE$.asJavaCollectionConverter(List$.MODULE$.empty()).asJavaCollection()))}))).asJava()).all().get(20L, TimeUnit.SECONDS);
            configResource = Assertions.fail("alterConfigs() on a mirror topic should fail");
        } catch (ExecutionException unused) {
            Assertions.assertTrue(configResource.getCause() instanceof InvalidRequestException);
        }
        new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.UncleanLeaderElectionEnableProp()), new Some("true")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.UncleanLeaderElectionEnableProp()), None$.MODULE$), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.CleanupPolicyProp()), new Some("compact")), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(LogConfig$.MODULE$.CleanupPolicyProp()), None$.MODULE$), Nil$.MODULE$)))).foreach(tuple2 -> {
            $anonfun$testDestReadOnly$4(confluentAdmin, configResource, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ String $anonfun$testDeleteClusterLinkCleanup$1(int i) {
        return new StringBuilder(6).append("topic-").append(i).toString();
    }

    public static final /* synthetic */ TopicPartition $anonfun$testMirroredTopicMarkedForDelete$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest, int i) {
        return new TopicPartition(clusterLinkIntegrationTest.topic(), i);
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$2(int i, KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId() != i;
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$4(IndexedSeq indexedSeq, KafkaServer kafkaServer) {
        return ((SeqLike) indexedSeq.flatMap(topicPartition -> {
            return Option$.MODULE$.option2Iterable(kafkaServer.replicaManager().onlinePartition(topicPartition));
        }, IndexedSeq$.MODULE$.canBuildFrom())).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testMirroredTopicMarkedForDelete$6() {
        return "Partitions not offline after topic deletion";
    }

    public static final /* synthetic */ boolean $anonfun$testMirroredTopicMarkedForDelete$7(KafkaServer kafkaServer, UUID uuid) {
        return ((ClusterLinkFetcherManager) kafkaServer.clusterLinkManager().fetcherManager(uuid).get()).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testMirroredTopicMarkedForDelete$8() {
        return "Fetcher manager not empty after topic deletion";
    }

    public static final /* synthetic */ void $anonfun$testMirroredTopicMarkedForDelete$3(IndexedSeq indexedSeq, UUID uuid, KafkaServer kafkaServer) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testMirroredTopicMarkedForDelete$4(indexedSeq, kafkaServer)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testMirroredTopicMarkedForDelete$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testMirroredTopicMarkedForDelete$7(kafkaServer, uuid)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testMirroredTopicMarkedForDelete$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testPauseTopic$5(ReplicaStatus.MirrorInfo mirrorInfo) {
        ReplicaStatus.MirrorInfo.State state = mirrorInfo.state();
        ReplicaStatus.MirrorInfo.State state2 = ReplicaStatus.MirrorInfo.State.PAUSED;
        return state == null ? state2 == null : state.equals(state2);
    }

    public static final /* synthetic */ boolean $anonfun$testPauseTopic$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), clusterLinkIntegrationTest.numPartitions()).forall(i -> {
            return Option$.MODULE$.apply(((ReplicaStatus) ((IterableLike) clusterLinkIntegrationTest.destCluster().replicaStatus(clusterLinkIntegrationTest.topic(), i, clusterLinkIntegrationTest.destCluster().replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).mirrorInfo().orElse(null)).exists(mirrorInfo -> {
                return BoxesRunTime.boxToBoolean($anonfun$testPauseTopic$5(mirrorInfo));
            });
        });
    }

    public static final /* synthetic */ String $anonfun$testPauseTopic$6() {
        return "Topic's partitions not paused";
    }

    private final Seq leaderOffsets$1() {
        return ((scala.collection.immutable.Seq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numPartitions()).map(i -> {
            return ((ReplicaStatus) ((IterableLike) this.destCluster().replicaStatus(this.topic(), i, this.destCluster().replicaStatus$default$3()).filter(replicaStatus -> {
                return BoxesRunTime.boxToBoolean(replicaStatus.isLeader());
            })).head()).logEndOffset();
        }, IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
    }

    public static final /* synthetic */ int $anonfun$testPauseClusterLink$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopic(clusterLinkIntegrationTest.topic()).partitions().size();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$3(int i, int i2) {
        return i2 == i;
    }

    public static final /* synthetic */ String $anonfun$testPauseClusterLink$4(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(clusterLinkIntegrationTest.topic()).get(LogConfig$.MODULE$.DeleteRetentionMsProp()).value();
    }

    public static final /* synthetic */ boolean $anonfun$testPauseClusterLink$5(String str, String str2) {
        return str2 == null ? str == null : str2.equals(str);
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$3(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkIntegrationTest.topic(), 0)) >= 1;
    }

    public static final /* synthetic */ String $anonfun$testReplicaStatus$4() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$5(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$6(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$7(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$8(ReplicaStatus replicaStatus) {
        return !replicaStatus.isLeader() && replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$9(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$10(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$11(ClusterLinkIntegrationTest clusterLinkIntegrationTest, ObjectRef objectRef) {
        objectRef.elem = clusterLinkIntegrationTest.destCluster().replicaStatus(clusterLinkIntegrationTest.topic(), 0, true);
        return ((Seq) objectRef.elem).size() == 2;
    }

    public static final /* synthetic */ String $anonfun$testReplicaStatus$12() {
        return "Cluster link not removed from topic's partition";
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$13(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaStatus$14(ReplicaStatus replicaStatus) {
        return (replicaStatus.isLeader() || replicaStatus.linkName().isPresent()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkIntegrationTest.topic(), 0)) >= 1;
    }

    public static final /* synthetic */ String $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$2() {
        return "Destination leader epoch not updated";
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$3(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testLastFetchedOffsetStoppedMirrorTopicDescription$4(ReplicaStatus replicaStatus) {
        return replicaStatus.isLeader() && !replicaStatus.linkName().isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$testDeleteAutoMirroredTopics$2(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return !clusterLinkIntegrationTest.destCluster().listMirrorTopics(clusterLinkIntegrationTest.destCluster().listMirrorTopics$default$1()).contains(clusterLinkIntegrationTest.topic());
    }

    public static final /* synthetic */ String $anonfun$testDeleteAutoMirroredTopics$3() {
        return "Mirror not stopped";
    }

    public static final /* synthetic */ boolean $anonfun$testIntervalChangeForPeriodicTasks$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().describeTopicConfig(clusterLinkIntegrationTest.topic()).get("delete.retention.ms").value().equals("80000000");
    }

    public static final /* synthetic */ String $anonfun$testIntervalChangeForPeriodicTasks$2() {
        return "Topic configs did not get propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorFailoverWhenSourceIsUnavailable$1(ClusterLinkIntegrationTest clusterLinkIntegrationTest) {
        return clusterLinkIntegrationTest.destCluster().leaderEpoch(new TopicPartition(clusterLinkIntegrationTest.topic(), 0)) >= 1;
    }

    public static final /* synthetic */ String $anonfun$testMirrorFailoverWhenSourceIsUnavailable$2() {
        return "Destination leader epoch not updated";
    }
}
