package kafka.server.link;

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import kafka.api.PartitionLinkState;
import kafka.cluster.ClusterLinkState;
import kafka.cluster.IsrState;
import kafka.cluster.Partition;
import kafka.common.KafkaException;
import kafka.controller.KafkaController;
import kafka.log.LogConfig$;
import kafka.network.SocketServer;
import kafka.server.ConfigType$;
import kafka.server.FetcherPool$Default$;
import kafka.server.FetcherTag;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.MetadataCache$;
import kafka.server.QuotaFactory$UnboundedQuota$;
import kafka.server.ReplicaManager;
import kafka.server.ZkAdminManager;
import kafka.server.ZkMetadataCache;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkManager;
import kafka.utils.KafkaScheduler;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterLinkExistsException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.Map;
import scala.collection.Set;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Eh\u0001\u0002\u0016,\u0001IBQ!\u000f\u0001\u0005\u0002iBq!\u0010\u0001C\u0002\u0013%a\b\u0003\u0004D\u0001\u0001\u0006Ia\u0010\u0005\b\t\u0002\u0011\r\u0011\"\u0003F\u0011\u0019\u0011\u0006\u0001)A\u0005\r\"91\u000b\u0001b\u0001\n\u0013!\u0006BB.\u0001A\u0003%Q\u000bC\u0004]\u0001\t\u0007I\u0011B/\t\r\u0005\u0004\u0001\u0015!\u0003_\u0011\u001d\u0011\u0007A1A\u0005\n\rDa!\u001b\u0001!\u0002\u0013!\u0007b\u00026\u0001\u0005\u0004%Ia\u001b\u0005\u0007e\u0002\u0001\u000b\u0011\u00027\t\u000fM\u0004!\u0019!C\u0005i\"1\u0001\u0010\u0001Q\u0001\nUD\u0011\"\u001f\u0001A\u0002\u0003\u0007I\u0011\u0002>\t\u0013y\u0004\u0001\u0019!a\u0001\n\u0013y\bBCA\u0006\u0001\u0001\u0007\t\u0011)Q\u0005w\"I\u0011Q\u0002\u0001C\u0002\u0013%\u0011q\u0002\u0005\t\u0003O\u0001\u0001\u0015!\u0003\u0002\u0012!9\u0011\u0011\u0006\u0001\u0005\u0002\u0005-\u0002bBA\"\u0001\u0011\u0005\u00111\u0006\u0005\b\u0003\u001b\u0002A\u0011AA\u0016\u0011\u001d\t9\u0006\u0001C\u0001\u0003WAq!a\u0017\u0001\t\u0003\tY\u0003C\u0004\u0002`\u0001!\t!a\u000b\t\u000f\u0005\u0005\u0004\u0001\"\u0001\u0002,!9\u00111\r\u0001\u0005\u0002\u0005-\u0002bBA3\u0001\u0011\u0005\u00111\u0006\u0005\b\u0003O\u0002A\u0011AA\u0016\u0011\u001d\tI\u0007\u0001C\u0001\u0003WBq!!(\u0001\t\u0003\tY\u0003C\u0004\u0002\"\u0002!\t!a\u000b\t\u000f\u0005\u0015\u0006\u0001\"\u0001\u0002,!9\u0011\u0011\u0016\u0001\u0005\u0002\u0005-\u0002bBAW\u0001\u0011\u0005\u00111\u0006\u0005\b\u0003c\u0003A\u0011BAZ\u0011\u001d\t)\f\u0001C\u0005\u0003oCq!!/\u0001\t\u0013\tY\fC\u0004\u0002D\u0002!I!!2\t\u000f\u0005-\b\u0001\"\u0003\u0002n\n12\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s)\u0016\u001cHO\u0003\u0002-[\u0005!A.\u001b8l\u0015\tqs&\u0001\u0004tKJ4XM\u001d\u0006\u0002a\u0005)1.\u00194lC\u000e\u00011C\u0001\u00014!\t!t'D\u00016\u0015\u00051\u0014!B:dC2\f\u0017B\u0001\u001d6\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012a\u000f\t\u0003y\u0001i\u0011aK\u0001\rEJ|7.\u001a:D_:4\u0017nZ\u000b\u0002\u007fA\u0011\u0001)Q\u0007\u0002[%\u0011!)\f\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\u0007ce>\\WM]\"p]\u001aLw\rI\u0001\b[\u0016$(/[2t+\u00051\u0005CA$Q\u001b\u0005A%B\u0001#J\u0015\tQ5*\u0001\u0004d_6lwN\u001c\u0006\u0003a1S!!\u0014(\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0015aA8sO&\u0011\u0011\u000b\u0013\u0002\b\u001b\u0016$(/[2t\u0003!iW\r\u001e:jGN\u0004\u0013\u0001\u0002;j[\u0016,\u0012!\u0016\t\u0003-fk\u0011a\u0016\u0006\u00031&\u000bQ!\u001e;jYNL!AW,\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\naB]3qY&\u001c\u0017-T1oC\u001e,'/F\u0001_!\t\u0001u,\u0003\u0002a[\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018a\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0011\u0002\u0015\r|g\u000e\u001e:pY2,'/F\u0001e!\t)w-D\u0001g\u0015\t\u0011w&\u0003\u0002iM\ny1*\u00194lC\u000e{g\u000e\u001e:pY2,'/A\u0006d_:$(o\u001c7mKJ\u0004\u0013\u0001\u0003>l\u00072LWM\u001c;\u0016\u00031\u0004\"!\u001c9\u000e\u00039T!a\\\u0018\u0002\u0005i\\\u0017BA9o\u00055Y\u0015MZ6b5.\u001cE.[3oi\u0006I!p[\"mS\u0016tG\u000fI\u0001\u000e[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0016\u0003U\u0004\"\u0001\u0011<\n\u0005]l#!D'fi\u0006$\u0017\r^1DC\u000eDW-\u0001\bnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.\u001a\u0011\u0002%\rdWo\u001d;fe2Kgn['b]\u0006<WM]\u000b\u0002wB\u0011A\b`\u0005\u0003{.\u0012!c\u00117vgR,'\u000fT5oW6\u000bg.Y4fe\u000612\r\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s?\u0012*\u0017\u000f\u0006\u0003\u0002\u0002\u0005\u001d\u0001c\u0001\u001b\u0002\u0004%\u0019\u0011QA\u001b\u0003\tUs\u0017\u000e\u001e\u0005\t\u0003\u0013\t\u0012\u0011!a\u0001w\u0006\u0019\u0001\u0010J\u0019\u0002'\rdWo\u001d;fe2Kgn['b]\u0006<WM\u001d\u0011\u0002;Q|\u0007/[2D_:4\u0017nZ*z]\u000eLen\u00197vI\u0016$UMZ1vYR,\"!!\u0005\u0011\t\u0005M\u0011\u0011\u0005\b\u0005\u0003+\ti\u0002E\u0002\u0002\u0018Uj!!!\u0007\u000b\u0007\u0005m\u0011'\u0001\u0004=e>|GOP\u0005\u0004\u0003?)\u0014A\u0002)sK\u0012,g-\u0003\u0003\u0002$\u0005\u0015\"AB*ue&twMC\u0002\u0002 U\na\u0004^8qS\u000e\u001cuN\u001c4jONKhnY%oG2,H-\u001a#fM\u0006,H\u000e\u001e\u0011\u0002\u000bM,G/\u00169\u0015\u0005\u0005\u0005\u0001fA\u000b\u00020A!\u0011\u0011GA \u001b\t\t\u0019D\u0003\u0003\u00026\u0005]\u0012aA1qS*!\u0011\u0011HA\u001e\u0003\u001dQW\u000f]5uKJT1!!\u0010O\u0003\u0015QWO\\5u\u0013\u0011\t\t%a\r\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\r1\u0012q\t\t\u0005\u0003c\tI%\u0003\u0003\u0002L\u0005M\"!C!gi\u0016\u0014X)Y2i\u0003A!Xm\u001d;DYV\u001cH/\u001a:MS:\\7\u000fK\u0002\u0018\u0003#\u0002B!!\r\u0002T%!\u0011QKA\u001a\u0005\u0011!Vm\u001d;\u0002\u001fQ,7\u000f\u001e*fG>tg-[4ve\u0016D3\u0001GA)\u0003\u001d\"Xm\u001d;U_BL7mQ8oM&<7+\u001f8d\u0013:\u001cG.\u001e3f\u0007>l\u0007/\u0019;jE&d\u0017\u000e^=)\u0007e\t\t&A\u0007WKJLg-\u001f#fM\u0006,H\u000e^\u0001\u001c-\u0016\u0014\u0018NZ=[\u0017\"\u000b7/T1mM>\u0014X.\u001a3D_:4\u0017nZ:\u00023Y+'/\u001b4z5.C\u0015m]+oW:|wO\\\"p]\u001aLwm]\u0001\u001c-\u0016\u0014\u0018NZ=[\u00176K7o]3t\u00032<\u0018-_:D_:4\u0017nZ:\u0002;Y+'/\u001b4z5.C\u0015m]%oI\u0016\u0004XM\u001c3f]R\u001cuN\u001c4jON\f\u0011c\u0019:fCR,7\t\\;ti\u0016\u0014H*\u001b8l)!\ti'a\u001f\u0002��\u0005M\u0005\u0003BA8\u0003kr1\u0001PA9\u0013\r\t\u0019hK\u0001\u0013\u00072,8\u000f^3s\u0019&t7NR1di>\u0014\u00180\u0003\u0003\u0002x\u0005e$A\u0004$fi\u000eDWM]'b]\u0006<WM\u001d\u0006\u0004\u0003gZ\u0003bBA??\u0001\u0007\u0011\u0011C\u0001\tY&t7NT1nK\"9\u0011\u0011Q\u0010A\u0002\u0005\r\u0015A\u00027j].LE\r\u0005\u0003\u0002\u0006\u0006=UBAAD\u0015\u0011\tI)a#\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003\u001b\u000bAA[1wC&!\u0011\u0011SAD\u0005\u0011)V+\u0013#\t\u000f\u0005Uu\u00041\u0001\u0002\u0018\u0006\u00012\r\\;ti\u0016\u0014H*\u001b8l!J|\u0007o\u001d\t\u0005\u0003\u000b\u000bI*\u0003\u0003\u0002\u001c\u0006\u001d%A\u0003)s_B,'\u000f^5fg\u0006AB/Z:u\r\u0006LG.\u001a3BI\u0012\u001cE.^:uKJd\u0015N\\6)\u0007\u0001\n\t&\u0001\fuKN$(+Z2p]\u001aLw-\u001e:f\r\u0006LG.\u001e:fQ\r\t\u0013\u0011K\u0001\u0015i\u0016\u001cH\u000fR=oC6L7MR3uG\"\u001c\u0016N_3)\u0007\t\n\t&A\u0011uKN$8\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4SK\u0016t7M]=qi&|g\u000eK\u0002$\u0003#\n\u0001\u0005^3ti2+\u0017\rZ3s\u0003:$\u0017j\u001d:CK\u001a|'/\u001a'j].,\u0006\u000fZ1uK\"\u001aA%!\u0015\u0002%\r\u0014X-\u0019;f\u0005J|7.\u001a:D_:4\u0017n\u001a\u000b\u0002\u007f\u0005Q2\r\\;ti\u0016\u0014H*\u001b8l!\u0016\u00148/[:uK:$\bK]8qgV\u0011\u0011qS\u0001\u0012G2,8\u000f^3s\u0019&t7nQ8oM&<WCAA_!\ra\u0014qX\u0005\u0004\u0003\u0003\\#!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006I1/\u001a;va6{7m\u001b\u000b\t\u0003\u0003\t9-a6\u0002d\"9\u0011\u0011\u001a\u0015A\u0002\u0005-\u0017!\u00039beRLG/[8o!\u0011\ti-a5\u000e\u0005\u0005='bAAi_\u000591\r\\;ti\u0016\u0014\u0018\u0002BAk\u0003\u001f\u0014\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\u0005e\u0007\u00061\u0001\u0002\\\u0006\u0011A\u000f\u001d\t\u0005\u0003;\fy.D\u0001J\u0013\r\t\t/\u0013\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u001d\t\t\t\u000ba\u0001\u0003K\u0004R\u0001NAt\u0003\u0007K1!!;6\u0005\u0019y\u0005\u000f^5p]\u0006A2M]3bi\u0016\u001cE.^:uKJd\u0015N\\6NC:\fw-\u001a:\u0015\u0007m\fy\u000fC\u0003>S\u0001\u0007q\b")
/* loaded from: input_file:kafka/server/link/ClusterLinkManagerTest.class */
public class ClusterLinkManagerTest {
    private final KafkaConfig brokerConfig = createBrokerConfig();
    private final Metrics metrics = new Metrics();
    private final MockTime time = new MockTime();
    private final ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
    private final KafkaController controller = (KafkaController) EasyMock.mock(KafkaController.class);
    private final KafkaZkClient zkClient = (KafkaZkClient) EasyMock.createNiceMock(KafkaZkClient.class);
    private final MetadataCache metadataCache;
    private ClusterLinkManager clusterLinkManager;
    private final String topicConfigSyncIncludeDefault;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private MockTime time() {
        return this.time;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private KafkaController controller() {
        return this.controller;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private MetadataCache metadataCache() {
        return this.metadataCache;
    }

    private ClusterLinkManager clusterLinkManager() {
        return this.clusterLinkManager;
    }

    private void clusterLinkManager_$eq(ClusterLinkManager clusterLinkManager) {
        this.clusterLinkManager = clusterLinkManager;
    }

    private String topicConfigSyncIncludeDefault() {
        return this.topicConfigSyncIncludeDefault;
    }

    @BeforeEach
    public void setUp() {
        EasyMock.expect(replicaManager().metadataCache()).andReturn(metadataCache()).anyTimes();
        EasyMock.expect(replicaManager().zkClient()).andReturn(new Some(zkClient())).anyTimes();
        EasyMock.expect(replicaManager().leaderPartitionsIterator()).andReturn(Predef$.MODULE$.Set().empty().iterator()).anyTimes();
        EasyMock.replay(new Object[]{replicaManager()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(controller().isActive())).andReturn(BoxesRunTime.boxToBoolean(true)).anyTimes();
        EasyMock.replay(new Object[]{controller()});
        EasyMock.expect(zkClient().getChildren("/cluster_links")).andReturn(package$.MODULE$.Seq().empty()).anyTimes();
        EasyMock.expect(zkClient().getClusterLinks(Predef$.MODULE$.Set().empty())).andReturn(Predef$.MODULE$.Map().empty()).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager_$eq(createClusterLinkManager(brokerConfig()));
        EasyMock.reset(new Object[]{zkClient()});
    }

    @AfterEach
    public void tearDown() {
        clusterLinkManager().shutdown();
        metrics().close();
    }

    @Test
    public void testClusterLinks() {
        String str = "testLink";
        UUID randomUUID = UUID.randomUUID();
        String str2 = "testClusterId";
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUUID));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        Assertions$.MODULE$.intercept(() -> {
            return this.clusterLinkManager().resolveLinkIdOrThrow(str);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 82));
        clusterLinkManager().ensureLinkNameDoesntExist("testLink");
        Assertions.assertEquals(package$.MODULE$.Seq().empty(), clusterLinkManager().listClusterLinks());
        setupMock(partition, topicPartition, None$.MODULE$);
        clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition})));
        setupMock(partition, topicPartition, new Some(randomUUID));
        Assertions.assertEquals(0, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        setupMock(partition, topicPartition, new Some(randomUUID));
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUUID));
        Assertions.assertEquals(new Some(randomUUID), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ClusterLinkData[]{clusterLinkData})), clusterLinkManager().listClusterLinks());
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) clusterLinkManager().fetcherManager(randomUUID).get();
        ClusterLinkClientManager clusterLinkClientManager = (ClusterLinkClientManager) clusterLinkManager().clientManager(randomUUID).get();
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(new ClusterLinkData(str, UUID.randomUUID(), new Some(str2), None$.MODULE$, false), this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(ClusterLinkExistsException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 105));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not added to metadata");
        Assertions.assertTrue(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic not added to client manager");
        Assertions.assertFalse(clusterLinkFetcherManager.isEmpty(), "Fetcher not recording active topic");
        LeaderAndIsrRequestData.LeaderAndIsrPartitionState leaderAndIsrPartitionState = (LeaderAndIsrRequestData.LeaderAndIsrPartitionState) EasyMock.mock(LeaderAndIsrRequestData.LeaderAndIsrPartitionState.class);
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn(randomUUID.toString()).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkTopicState()).andReturn("Mirror").anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderAndIsrPartitionState.linkedLeaderEpoch())).andReturn(BoxesRunTime.boxToInteger(1)).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkSourceTopicId()).andReturn(Uuid.randomUuid()).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic not removed from client manager");
        EasyMock.reset(new Object[]{leaderAndIsrPartitionState});
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn((Object) null).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkSourceTopicId()).andReturn(Uuid.randomUuid()).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager");
        EasyMock.reset(new Object[]{leaderAndIsrPartitionState});
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn(randomUUID.toString()).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkSourceTopicId()).andReturn(Uuid.randomUuid()).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkTopicState()).andReturn("FailedMirror").anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderAndIsrPartitionState.linkedLeaderEpoch())).andReturn(BoxesRunTime.boxToInteger(-1)).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        clusterLinkManager().removePartitions((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata for failed mirror");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager for failed mirror");
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        Partition partition2 = (Partition) EasyMock.createNiceMock(Partition.class);
        setupMock(partition2, topicPartition2, new Some(randomUUID));
        Assertions.assertEquals(1, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition2}))));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not added to metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be added to client manager");
        clusterLinkManager().removePartitionsAndMetadata((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition2})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in to client manager");
        Object obj = clusterLinkManager().fetcherManager(randomUUID).get();
        Assertions.assertTrue(obj != null && obj.equals(clusterLinkFetcherManager), "Unexpected fetcher manager");
        Object obj2 = clusterLinkManager().clientManager(randomUUID).get();
        Assertions.assertTrue(obj2 != null && obj2.equals(clusterLinkClientManager), "Unexpected client manager");
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(2);
        zkClient().setClusterLink(new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, true));
        EasyMock.expect(BoxedUnit.UNIT);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().deleteClusterLink("testLink", randomUUID);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().deleteClusterLink(str, randomUUID);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 177));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testClusterLinks$4(this, randomUUID)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail($anonfun$testClusterLinks$5(randomUUID));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    @Test
    public void testReconfigure() {
        String str = "testLink";
        UUID randomUUID = UUID.randomUUID();
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().updateClusterLinkConfig(str, properties -> {
                return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$2(properties));
            });
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 191));
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.expect(zkClient().getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false))}))).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        clusterLinkManager().createClusterLink(new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), clusterLinkPersistentProps());
        ClusterLinkFactory.FetcherManager fetcherManager = (ClusterLinkFactory.FetcherManager) clusterLinkManager().fetcherManager(randomUUID).get();
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(1);
        EasyMock.expect(zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), randomUUID.toString())).andReturn(properties).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().updateClusterLinkConfig("testLink", properties2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$3(properties2));
        });
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(1);
        EasyMock.expect(zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), randomUUID.toString())).andReturn(properties).times(1);
        EasyMock.expect(zkClient().getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false))}))).times(1);
        Capture newCapture = EasyMock.newCapture();
        zkClient().setOrCreateEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Properties) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().updateClusterLinkConfig("testLink", properties3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$4(properties3));
        });
        clusterLinkManager().processClusterLinkChanges(randomUUID, (Properties) newCapture.getValue());
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
    }

    @Test
    public void testTopicConfigSyncIncludeCompatibility() {
        VerifyDefault();
        VerifyZKHasMalformedConfigs();
        VerifyZKHasUnknownConfigs();
        VerifyZKMissesAlwaysConfigs();
        VerifyZKHasIndependentConfigs();
    }

    public void VerifyDefault() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-default", randomUUID, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        EasyMock.reset(new Object[]{zkClient()});
    }

    public void VerifyZKHasMalformedConfigs() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-malformed", randomUUID, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(22).append(topicConfigSyncIncludeDefault()).append(" min.compaction.lag.ms").toString());
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        EasyMock.reset(new Object[]{zkClient()});
    }

    public void VerifyZKHasUnknownConfigs() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-unknown", randomUUID, clusterLinkPersistentProps());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), new StringBuilder(21).append(topicConfigSyncIncludeDefault()).append(",unknown.topic.config").toString());
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(CollectionConverters$.MODULE$.ListHasAsScala(MirrorTopicConfigSyncRules$.MODULE$.SyncIncludeDefault()).asScala().toSet(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        EasyMock.reset(new Object[]{zkClient()});
    }

    public void VerifyZKMissesAlwaysConfigs() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-always", randomUUID, clusterLinkPersistentProps());
        Properties properties = new Properties();
        String MinCompactionLagMsProp = LogConfig$.MODULE$.MinCompactionLagMsProp();
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), MinCompactionLagMsProp);
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{LogConfig$.MODULE$.MinCompactionLagMsProp()}))), createClusterLink.currentConfig().topicConfigSyncRules().include());
        EasyMock.reset(new Object[]{zkClient()});
    }

    public void VerifyZKHasIndependentConfigs() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkFactory.FetcherManager createClusterLink = createClusterLink("test-independent", randomUUID, clusterLinkPersistentProps());
        Properties properties = new Properties();
        String mkString = MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs().$plus$plus((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{LogConfig$.MODULE$.TierEnableProp()}))).mkString(",");
        properties.put("bootstrap.servers", "localhost:5678");
        properties.put(ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp(), mkString);
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), createClusterLink.currentConfig().bootstrapServers());
        Assertions.assertEquals(MirrorTopicConfigSyncRules$.MODULE$.AlwaysConfigs(), createClusterLink.currentConfig().topicConfigSyncRules().include());
        EasyMock.reset(new Object[]{zkClient()});
    }

    public ClusterLinkFactory.FetcherManager createClusterLink(String str, UUID uuid, Properties properties) {
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().updateClusterLinkConfig(str, properties2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$createClusterLink$2(properties2));
            });
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 362));
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(uuid))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.expect(zkClient().getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{uuid})))).andReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uuid), new ClusterLinkData(str, uuid, None$.MODULE$, None$.MODULE$, false))}))).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(uuid));
        clusterLinkManager().createClusterLink(new ClusterLinkData(str, uuid, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), properties);
        return (ClusterLinkFactory.FetcherManager) clusterLinkManager().fetcherManager(uuid).get();
    }

    @Test
    public void testFailedAddClusterLink() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false);
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        zkClient().createClusterLink(clusterLinkData);
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new RuntimeException("")).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(clusterLinkData, this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(RuntimeException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 395));
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertEquals(new Some(randomUUID), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ClusterLinkData[]{clusterLinkData})), clusterLinkManager().listClusterLinks());
    }

    @Test
    public void testReconfigureFailure() {
        UUID randomUUID = UUID.randomUUID();
        EasyMock.expect(zkClient().getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false))}))).once();
        EasyMock.replay(new Object[]{zkClient()});
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "");
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(FailedClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
    }

    @Test
    public void testDynamicFetchSize() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        FetchResponseSize fetchResponseSize = new FetchResponseSize(Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchMaxBytes()), Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchResponseMaxBytes()));
        Assertions.assertEquals(fetchResponseSize, clusterLinkManager().fetchResponseSize(clusterLinkConfig()));
        UUID randomUUID = UUID.randomUUID();
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().createClusterLink(new ClusterLinkData("link", randomUUID, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), clusterLinkPersistentProps());
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) clusterLinkManager().fetcherManager(randomUUID).get();
        Assertions.assertEquals(0, clusterLinkFetcherManager.fetcherThreadCount());
        verifyFetchSize$1(fetchResponseSize, createBrokerConfig);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", "10000");
        verifyFetchSize$1(new FetchResponseSize(5000, 10000), createBrokerConfig);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.min.bytes", "6000");
        verifyFetchSize$1(new FetchResponseSize(6000, 10000), createBrokerConfig);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", Integer.toString(Integer.MAX_VALUE));
        verifyFetchSize$1(fetchResponseSize, createBrokerConfig);
        ClusterLinkFetcherThread clusterLinkFetcherThread = (ClusterLinkFetcherThread) EasyMock.createNiceMock(ClusterLinkFetcherThread.class);
        IntRef create = IntRef.create(0);
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", Integer.toString(Predef$.MODULE$.Integer2int(clusterLinkConfig().replicaFetchResponseMaxBytes()) * 6));
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i -> {
            create.elem++;
            clusterLinkFetcherManager.fetcherThreadMap().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new FetcherTag(0, create.elem, FetcherPool$Default$.MODULE$)), clusterLinkFetcherThread));
            this.verifyFetchSize$1(new FetchResponseSize(Predef$.MODULE$.Integer2int(this.clusterLinkConfig().replicaFetchMaxBytes()), Predef$.MODULE$.Integer2int(this.clusterLinkConfig().replicaFetchResponseMaxBytes())), createBrokerConfig);
        });
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.total.bytes", "10000");
        createBrokerConfig.setProperty("confluent.cluster.link.fetch.response.min.bytes", "1");
        verifyFetchSize$1(new FetchResponseSize(1000, 2000), createBrokerConfig);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach$mVc$sp(i2 -> {
            addFetcherThread$1(create, clusterLinkFetcherManager, clusterLinkFetcherThread);
        });
        verifyFetchSize$1(new FetchResponseSize(500, 1000), createBrokerConfig);
        clusterLinkFetcherManager.fetcherThreadMap().clear();
        verifyFetchSize$1(new FetchResponseSize(5000, 10000), createBrokerConfig);
    }

    @Test
    public void testClusterLinkConfigReencryption() {
        UUID randomUUID = UUID.randomUUID();
        ObjectRef create = ObjectRef.create((Object) null);
        KafkaException kafkaException = new KafkaException("Test exception");
        setupZkClient$1(randomUUID, "link1");
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).andThrow(kafkaException).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(Long.MAX_VALUE));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig)));
        create.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$1(create)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail("Retry not scheduled after failure");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        setupZkClient$1(randomUUID, "link1");
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$17 = TestUtils$.MODULE$;
        TestUtils$ testUtils$18 = TestUtils$.MODULE$;
        TestUtils$ testUtils$19 = TestUtils$.MODULE$;
        int RandomPort5 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$20 = TestUtils$.MODULE$;
        None$ none$5 = None$.MODULE$;
        TestUtils$ testUtils$21 = TestUtils$.MODULE$;
        None$ none$6 = None$.MODULE$;
        TestUtils$ testUtils$22 = TestUtils$.MODULE$;
        None$ none$7 = None$.MODULE$;
        TestUtils$ testUtils$23 = TestUtils$.MODULE$;
        TestUtils$ testUtils$24 = TestUtils$.MODULE$;
        int RandomPort6 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$25 = TestUtils$.MODULE$;
        int RandomPort7 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$26 = TestUtils$.MODULE$;
        int RandomPort8 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$27 = TestUtils$.MODULE$;
        None$ none$8 = None$.MODULE$;
        TestUtils$ testUtils$28 = TestUtils$.MODULE$;
        TestUtils$ testUtils$29 = TestUtils$.MODULE$;
        TestUtils$ testUtils$30 = TestUtils$.MODULE$;
        TestUtils$ testUtils$31 = TestUtils$.MODULE$;
        Properties createBrokerConfig2 = testUtils$17.createBrokerConfig(1, "localhost:1234", true, true, RandomPort5, none$5, none$6, none$7, true, false, RandomPort6, false, RandomPort7, false, RandomPort8, none$8, 1, false, 1, (short) 1);
        createBrokerConfig2.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig2.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(Long.MAX_VALUE));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig2)));
        create.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
        TestUtils$ testUtils$32 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$3(create)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail("Unnecessary retry scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        setupZkClient$1(randomUUID, "link1");
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$33 = TestUtils$.MODULE$;
        TestUtils$ testUtils$34 = TestUtils$.MODULE$;
        TestUtils$ testUtils$35 = TestUtils$.MODULE$;
        int RandomPort9 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$36 = TestUtils$.MODULE$;
        None$ none$9 = None$.MODULE$;
        TestUtils$ testUtils$37 = TestUtils$.MODULE$;
        None$ none$10 = None$.MODULE$;
        TestUtils$ testUtils$38 = TestUtils$.MODULE$;
        None$ none$11 = None$.MODULE$;
        TestUtils$ testUtils$39 = TestUtils$.MODULE$;
        TestUtils$ testUtils$40 = TestUtils$.MODULE$;
        int RandomPort10 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$41 = TestUtils$.MODULE$;
        int RandomPort11 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$42 = TestUtils$.MODULE$;
        int RandomPort12 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$43 = TestUtils$.MODULE$;
        None$ none$12 = None$.MODULE$;
        TestUtils$ testUtils$44 = TestUtils$.MODULE$;
        TestUtils$ testUtils$45 = TestUtils$.MODULE$;
        TestUtils$ testUtils$46 = TestUtils$.MODULE$;
        TestUtils$ testUtils$47 = TestUtils$.MODULE$;
        Properties createBrokerConfig3 = testUtils$33.createBrokerConfig(1, "localhost:1234", true, true, RandomPort9, none$9, none$10, none$11, true, false, RandomPort10, false, RandomPort11, false, RandomPort12, none$12, 1, false, 1, (short) 1);
        createBrokerConfig3.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig3.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(30000L));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig3)));
        create.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
        TestUtils$ testUtils$48 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$5(create)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail("Old encoder delete not scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        setupZkClient$1(randomUUID, "link1");
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$49 = TestUtils$.MODULE$;
        TestUtils$ testUtils$50 = TestUtils$.MODULE$;
        TestUtils$ testUtils$51 = TestUtils$.MODULE$;
        int RandomPort13 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$52 = TestUtils$.MODULE$;
        None$ none$13 = None$.MODULE$;
        TestUtils$ testUtils$53 = TestUtils$.MODULE$;
        None$ none$14 = None$.MODULE$;
        TestUtils$ testUtils$54 = TestUtils$.MODULE$;
        None$ none$15 = None$.MODULE$;
        TestUtils$ testUtils$55 = TestUtils$.MODULE$;
        TestUtils$ testUtils$56 = TestUtils$.MODULE$;
        int RandomPort14 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$57 = TestUtils$.MODULE$;
        int RandomPort15 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$58 = TestUtils$.MODULE$;
        int RandomPort16 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$59 = TestUtils$.MODULE$;
        None$ none$16 = None$.MODULE$;
        TestUtils$ testUtils$60 = TestUtils$.MODULE$;
        TestUtils$ testUtils$61 = TestUtils$.MODULE$;
        TestUtils$ testUtils$62 = TestUtils$.MODULE$;
        TestUtils$ testUtils$63 = TestUtils$.MODULE$;
        Properties createBrokerConfig4 = testUtils$49.createBrokerConfig(1, "localhost:1234", true, true, RandomPort13, none$13, none$14, none$15, true, false, RandomPort14, false, RandomPort15, false, RandomPort16, none$16, 1, false, 1, (short) 1);
        createBrokerConfig4.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig4.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(1L));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig4)));
        create.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
        TestUtils$ testUtils$64 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$7(create)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail("Unnecessary delete retry scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
        setupZkClient$1(randomUUID, "link1");
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).once();
        zkClient().transformEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Function1) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).andThrow(kafkaException).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$65 = TestUtils$.MODULE$;
        TestUtils$ testUtils$66 = TestUtils$.MODULE$;
        TestUtils$ testUtils$67 = TestUtils$.MODULE$;
        int RandomPort17 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$68 = TestUtils$.MODULE$;
        None$ none$17 = None$.MODULE$;
        TestUtils$ testUtils$69 = TestUtils$.MODULE$;
        None$ none$18 = None$.MODULE$;
        TestUtils$ testUtils$70 = TestUtils$.MODULE$;
        None$ none$19 = None$.MODULE$;
        TestUtils$ testUtils$71 = TestUtils$.MODULE$;
        TestUtils$ testUtils$72 = TestUtils$.MODULE$;
        int RandomPort18 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$73 = TestUtils$.MODULE$;
        int RandomPort19 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$74 = TestUtils$.MODULE$;
        int RandomPort20 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$75 = TestUtils$.MODULE$;
        None$ none$20 = None$.MODULE$;
        TestUtils$ testUtils$76 = TestUtils$.MODULE$;
        TestUtils$ testUtils$77 = TestUtils$.MODULE$;
        TestUtils$ testUtils$78 = TestUtils$.MODULE$;
        TestUtils$ testUtils$79 = TestUtils$.MODULE$;
        Properties createBrokerConfig5 = testUtils$65.createBrokerConfig(1, "localhost:1234", true, true, RandomPort17, none$17, none$18, none$19, true, false, RandomPort18, false, RandomPort19, false, RandomPort20, none$20, 1, false, 1, (short) 1);
        createBrokerConfig5.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig5.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(1L));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig5)));
        create.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
        TestUtils$ testUtils$80 = TestUtils$.MODULE$;
        long waitUntilTrue$default$35 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$45 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long currentTimeMillis5 = System.currentTimeMillis();
        while (!$anonfun$testClusterLinkConfigReencryption$9(create)) {
            if (System.currentTimeMillis() > currentTimeMillis5 + waitUntilTrue$default$35) {
                Assertions.fail("Delete retry not scheduled");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$35), waitUntilTrue$default$45));
        }
    }

    @Test
    public void testLeaderAndIsrBeforeLinkUpdate() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        setupMock(partition, topicPartition, new Some(randomUUID));
        EasyMock.reset(new Object[]{replicaManager()});
        EasyMock.expect(replicaManager().metadataCache()).andReturn(metadataCache()).anyTimes();
        EasyMock.expect(replicaManager().zkClient()).andReturn(new Some(zkClient())).anyTimes();
        EasyMock.expect(replicaManager().leaderPartitionsIterator()).andReturn(((IterableOnce) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))).iterator()).anyTimes();
        EasyMock.replay(new Object[]{replicaManager()});
        Assertions.assertEquals(0, clusterLinkManager().addPartitions((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition}))));
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.mock(ClusterLinkFetcherManager.class);
        clusterLinkFetcherManager.addLinkedFetcherForPartitions((Iterable) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Partition[]{partition})));
        EasyMock.expect(BoxedUnit.UNIT).once();
        clusterLinkFetcherManager.shutdown();
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{clusterLinkFetcherManager});
        ClusterLinkDestConnectionManager clusterLinkDestConnectionManager = (ClusterLinkDestConnectionManager) EasyMock.mock(ClusterLinkDestConnectionManager.class);
        EasyMock.expect(clusterLinkDestConnectionManager.currentConfig()).andReturn(clusterLinkConfig()).anyTimes();
        clusterLinkDestConnectionManager.shutdown();
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.replay(new Object[]{clusterLinkDestConnectionManager});
        clusterLinkManager().commitAddClusterLink(clusterLinkData, new ClusterLinkManager.Managers(new Some(clusterLinkFetcherManager), None$.MODULE$, clusterLinkDestConnectionManager, (ClusterLinkMetrics) null), LinkMode$Destination$.MODULE$);
        EasyMock.verify(new Object[]{clusterLinkFetcherManager});
    }

    private KafkaConfig createBrokerConfig() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    private Properties clusterLinkPersistentProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        return properties;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return ClusterLinkConfig$.MODULE$.create(clusterLinkPersistentProps());
    }

    private void setupMock(Partition partition, TopicPartition topicPartition, Option<UUID> option) {
        EasyMock.reset(new Object[]{partition});
        EasyMock.expect(partition.topicPartition()).andReturn(topicPartition).anyTimes();
        EasyMock.expect(partition.getClusterLinkId()).andReturn(option).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isActiveLinkDestinationLeader())).andReturn(BoxesRunTime.boxToBoolean(option.nonEmpty())).anyTimes();
        EasyMock.expect(partition.getLinkedLeaderEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(1))).anyTimes();
        IsrState isrState = (IsrState) EasyMock.mock(IsrState.class);
        EasyMock.expect(isrState.clusterLink()).andReturn(option.map(uuid -> {
            return new ClusterLinkState(uuid, TopicLinkMirror$.MODULE$, (PartitionLinkState) null);
        })).anyTimes();
        EasyMock.replay(new Object[]{isrState});
        EasyMock.expect(partition.isrState()).andReturn(isrState).anyTimes();
        EasyMock.replay(new Object[]{partition});
    }

    private ClusterLinkManager createClusterLinkManager(KafkaConfig kafkaConfig) {
        ClusterLinkFactory$ clusterLinkFactory$ = ClusterLinkFactory$.MODULE$;
        QuotaFactory$UnboundedQuota$ quotaFactory$UnboundedQuota$ = QuotaFactory$UnboundedQuota$.MODULE$;
        KafkaZkClient zkClient = zkClient();
        Metrics metrics = metrics();
        MockTime time = time();
        ClusterLinkFactory$ clusterLinkFactory$2 = ClusterLinkFactory$.MODULE$;
        ClusterLinkManager createLinkManager = clusterLinkFactory$.createLinkManager(kafkaConfig, "clusterId", quotaFactory$UnboundedQuota$, zkClient, metrics, time, None$.MODULE$);
        Endpoint endpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 1234);
        AuthorizerServerInfo authorizerServerInfo = (AuthorizerServerInfo) EasyMock.mock(AuthorizerServerInfo.class);
        EasyMock.expect(authorizerServerInfo.interBrokerEndpoint()).andReturn(endpoint).anyTimes();
        EasyMock.replay(new Object[]{authorizerServerInfo});
        createLinkManager.startup(authorizerServerInfo, replicaManager(), (ZkAdminManager) null, controller(), (SocketServer) null, None$.MODULE$);
        return createLinkManager;
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinks$4(ClusterLinkManagerTest clusterLinkManagerTest, UUID uuid) {
        return clusterLinkManagerTest.clusterLinkManager().fetcherManager(uuid).isEmpty() && clusterLinkManagerTest.clusterLinkManager().clientManager(uuid).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testClusterLinks$5(UUID uuid) {
        return new StringBuilder(38).append("Linked fetcher/client for ").append(uuid).append(" not removed").toString();
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$2(Properties properties) {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$3(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$4(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return true;
    }

    public static final /* synthetic */ boolean $anonfun$createClusterLink$2(Properties properties) {
        return false;
    }

    private final void verifyFetchSize$1(FetchResponseSize fetchResponseSize, Properties properties) {
        brokerConfig().updateCurrentConfig(KafkaConfig$.MODULE$.fromProps(properties));
        clusterLinkManager().updateDynamicFetchSize();
        Assertions.assertEquals(fetchResponseSize, clusterLinkManager().fetchResponseSize(clusterLinkConfig()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void addFetcherThread$1(IntRef intRef, ClusterLinkFetcherManager clusterLinkFetcherManager, ClusterLinkFetcherThread clusterLinkFetcherThread) {
        intRef.elem++;
        clusterLinkFetcherManager.fetcherThreadMap().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new FetcherTag(0, intRef.elem, FetcherPool$Default$.MODULE$)), clusterLinkFetcherThread));
    }

    private final void setupZkClient$1(UUID uuid, String str) {
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(zkClient().getChildren("/cluster_links")).andReturn(package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{uuid.toString()}))).anyTimes();
        EasyMock.expect(zkClient().getClusterLinks((scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new UUID[]{uuid})))).andReturn(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uuid), new ClusterLinkData(str, uuid, None$.MODULE$, None$.MODULE$, false))}))).anyTimes();
    }

    private final void recreateClusterLinkManager$1(long j, ObjectRef objectRef) {
        clusterLinkManager().shutdown();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        Properties createBrokerConfig = testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        createBrokerConfig.put("confluent.password.encoder.old.secret.ttl.ms", Long.toString(j));
        clusterLinkManager_$eq(createClusterLinkManager(KafkaConfig$.MODULE$.fromProps(createBrokerConfig)));
        objectRef.elem = (ScheduledThreadPoolExecutor) TestUtils.fieldValue(clusterLinkManager().scheduler(), KafkaScheduler.class, "executor");
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$1(ObjectRef objectRef) {
        return ((ScheduledThreadPoolExecutor) objectRef.elem).getTaskCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$2() {
        return "Retry not scheduled after failure";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$3(ObjectRef objectRef) {
        return ((ScheduledThreadPoolExecutor) objectRef.elem).getTaskCount() == 0;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$4() {
        return "Unnecessary retry scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$5(ObjectRef objectRef) {
        return ((ScheduledThreadPoolExecutor) objectRef.elem).getTaskCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$6() {
        return "Old encoder delete not scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$7(ObjectRef objectRef) {
        return ((ScheduledThreadPoolExecutor) objectRef.elem).getTaskCount() == 1;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$8() {
        return "Unnecessary delete retry scheduled";
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinkConfigReencryption$9(ObjectRef objectRef) {
        return ((ScheduledThreadPoolExecutor) objectRef.elem).getTaskCount() > 1;
    }

    public static final /* synthetic */ String $anonfun$testClusterLinkConfigReencryption$10() {
        return "Delete retry not scheduled";
    }

    public ClusterLinkManagerTest() {
        MetadataCache$ metadataCache$ = MetadataCache$.MODULE$;
        MetadataCache$ metadataCache$2 = MetadataCache$.MODULE$;
        this.metadataCache = new ZkMetadataCache(0, false);
        this.topicConfigSyncIncludeDefault = CollectionConverters$.MODULE$.ListHasAsScala(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala().mkString(",");
    }
}
