package kafka.server.link;

import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import kafka.cluster.Partition;
import kafka.controller.KafkaController;
import kafka.network.SocketServer;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.MetadataCache$;
import kafka.server.QuotaFactory$UnboundedQuota$;
import kafka.server.ReplicaManager;
import kafka.server.ZkAdminManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterLinkExistsException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001de\u0001\u0002\u0010 \u0001\u0019BQ!\f\u0001\u0005\u00029Bq!\r\u0001C\u0002\u0013%!\u0007\u0003\u00048\u0001\u0001\u0006Ia\r\u0005\bq\u0001\u0011\r\u0011\"\u0003:\u0011\u00191\u0005\u0001)A\u0005u!9q\t\u0001b\u0001\n\u0013A\u0005BB(\u0001A\u0003%\u0011\nC\u0004Q\u0001\t\u0007I\u0011B)\t\rU\u0003\u0001\u0015!\u0003S\u0011\u001d1\u0006A1A\u0005\n]Ca!\u0018\u0001!\u0002\u0013A\u0006b\u00020\u0001\u0005\u0004%Ia\u0018\u0005\u0007M\u0002\u0001\u000b\u0011\u00021\t\u000f\u001d\u0004!\u0019!C\u0005Q\"1A\u000e\u0001Q\u0001\n%D\u0011\"\u001c\u0001A\u0002\u0003\u0007I\u0011\u00028\t\u0013I\u0004\u0001\u0019!a\u0001\n\u0013\u0019\b\"C=\u0001\u0001\u0004\u0005\t\u0015)\u0003p\u0011\u0015Q\b\u0001\"\u0001|\u0011\u0019\ty\u0001\u0001C\u0001w\"1\u0011\u0011\u0004\u0001\u0005\u0002mDa!a\t\u0001\t\u0003Y\bBBA\u0014\u0001\u0011\u00051\u0010\u0003\u0004\u0002,\u0001!\ta\u001f\u0005\b\u0003_\u0001A\u0011BA\u0019\u0011\u001d\t\u0019\u0004\u0001C\u0005\u0003kAq!a\u0012\u0001\t\u0013\tI\u0005C\u0004\u0002R\u0001!I!a\u0015\t\u000f\u0005\u0005\u0005\u0001\"\u0003\u0002\u0004\n12\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s)\u0016\u001cHO\u0003\u0002!C\u0005!A.\u001b8l\u0015\t\u00113%\u0001\u0004tKJ4XM\u001d\u0006\u0002I\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001(!\tA3&D\u0001*\u0015\u0005Q\u0013!B:dC2\f\u0017B\u0001\u0017*\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012a\f\t\u0003a\u0001i\u0011aH\u0001\rEJ|7.\u001a:D_:4\u0017nZ\u000b\u0002gA\u0011A'N\u0007\u0002C%\u0011a'\t\u0002\f\u0017\u000647.Y\"p]\u001aLw-A\u0007ce>\\WM]\"p]\u001aLw\rI\u0001\b[\u0016$(/[2t+\u0005Q\u0004CA\u001eE\u001b\u0005a$B\u0001\u001d>\u0015\tqt(\u0001\u0004d_6lwN\u001c\u0006\u0003I\u0001S!!\u0011\"\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0015aA8sO&\u0011Q\t\u0010\u0002\b\u001b\u0016$(/[2t\u0003!iW\r\u001e:jGN\u0004\u0013\u0001\u0002;j[\u0016,\u0012!\u0013\t\u0003\u00156k\u0011a\u0013\u0006\u0003\u0019v\nQ!\u001e;jYNL!AT&\u0003\u00115{7m\u001b+j[\u0016\fQ\u0001^5nK\u0002\naB]3qY&\u001c\u0017-T1oC\u001e,'/F\u0001S!\t!4+\u0003\u0002UC\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\u0018a\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0011\u0002\u0015\r|g\u000e\u001e:pY2,'/F\u0001Y!\tI6,D\u0001[\u0015\t16%\u0003\u0002]5\ny1*\u00194lC\u000e{g\u000e\u001e:pY2,'/A\u0006d_:$(o\u001c7mKJ\u0004\u0013\u0001\u0003>l\u00072LWM\u001c;\u0016\u0003\u0001\u0004\"!\u00193\u000e\u0003\tT!aY\u0012\u0002\u0005i\\\u0017BA3c\u00055Y\u0015MZ6b5.\u001cE.[3oi\u0006I!p[\"mS\u0016tG\u000fI\u0001\u000e[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0016\u0003%\u0004\"\u0001\u000e6\n\u0005-\f#!D'fi\u0006$\u0017\r^1DC\u000eDW-\u0001\bnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.\u001a\u0011\u0002%\rdWo\u001d;fe2Kgn['b]\u0006<WM]\u000b\u0002_B\u0011\u0001\u0007]\u0005\u0003c~\u0011!c\u00117vgR,'\u000fT5oW6\u000bg.Y4fe\u000612\r\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3s?\u0012*\u0017\u000f\u0006\u0002uoB\u0011\u0001&^\u0005\u0003m&\u0012A!\u00168ji\"9\u00010EA\u0001\u0002\u0004y\u0017a\u0001=%c\u0005\u00192\r\\;ti\u0016\u0014H*\u001b8l\u001b\u0006t\u0017mZ3sA\u0005)1/\u001a;VaR\tA\u000f\u000b\u0002\u0014{B\u0019a0a\u0003\u000e\u0003}TA!!\u0001\u0002\u0004\u0005\u0019\u0011\r]5\u000b\t\u0005\u0015\u0011qA\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\tIAQ\u0001\u0006UVt\u0017\u000e^\u0005\u0004\u0003\u001by(A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000eK\u0002\u0015\u0003'\u00012A`A\u000b\u0013\r\t9b \u0002\n\u0003\u001a$XM]#bG\"\f\u0001\u0003^3ti\u000ecWo\u001d;fe2Kgn[:)\u0007U\ti\u0002E\u0002\u007f\u0003?I1!!\t��\u0005\u0011!Vm\u001d;\u0002\u001fQ,7\u000f\u001e*fG>tg-[4ve\u0016D3AFA\u000f\u0003a!Xm\u001d;GC&dW\rZ!eI\u000ecWo\u001d;fe2Kgn\u001b\u0015\u0004/\u0005u\u0011A\u0006;fgR\u0014VmY8oM&<WO]3GC&dWO]3)\u0007a\ti\"\u0001\nde\u0016\fG/\u001a\"s_.,'oQ8oM&<G#A\u001a\u00025\rdWo\u001d;fe2Kgn\u001b)feNL7\u000f^3oiB\u0013x\u000e]:\u0016\u0005\u0005]\u0002\u0003BA\u001d\u0003\u0007j!!a\u000f\u000b\t\u0005u\u0012qH\u0001\u0005kRLGN\u0003\u0002\u0002B\u0005!!.\u0019<b\u0013\u0011\t)%a\u000f\u0003\u0015A\u0013x\u000e]3si&,7/A\tdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e,\"!a\u0013\u0011\u0007A\ni%C\u0002\u0002P}\u0011\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u0003%\u0019X\r^;q\u001b>\u001c7\u000eF\u0004u\u0003+\n)'!\u001d\t\u000f\u0005]C\u00041\u0001\u0002Z\u0005I\u0001/\u0019:uSRLwN\u001c\t\u0005\u00037\n\t'\u0004\u0002\u0002^)\u0019\u0011qL\u0012\u0002\u000f\rdWo\u001d;fe&!\u00111MA/\u0005%\u0001\u0016M\u001d;ji&|g\u000eC\u0004\u0002hq\u0001\r!!\u001b\u0002\u0005Q\u0004\b\u0003BA6\u0003[j\u0011!P\u0005\u0004\u0003_j$A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\u001c\u0005\b\u0003gb\u0002\u0019AA;\u0003\u0019a\u0017N\\6JIB)\u0001&a\u001e\u0002|%\u0019\u0011\u0011P\u0015\u0003\r=\u0003H/[8o!\u0011\tI$! \n\t\u0005}\u00141\b\u0002\u0005+VKE)\u0001\rde\u0016\fG/Z\"mkN$XM\u001d'j].l\u0015M\\1hKJ$2a\\AC\u0011\u0015\tT\u00041\u00014\u0001")
/* loaded from: input_file:kafka/server/link/ClusterLinkManagerTest.class */
public class ClusterLinkManagerTest {
    private final KafkaConfig brokerConfig = createBrokerConfig();
    private final Metrics metrics = new Metrics();
    private final MockTime time = new MockTime();
    private final ReplicaManager replicaManager = (ReplicaManager) EasyMock.mock(ReplicaManager.class);
    private final KafkaController controller = (KafkaController) EasyMock.mock(KafkaController.class);
    private final KafkaZkClient zkClient = (KafkaZkClient) EasyMock.createNiceMock(KafkaZkClient.class);
    private final MetadataCache metadataCache = MetadataCache$.MODULE$.zkMetadataCache(0, MetadataCache$.MODULE$.zkMetadataCache$default$2());
    private ClusterLinkManager clusterLinkManager;

    private KafkaConfig brokerConfig() {
        return this.brokerConfig;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private MockTime time() {
        return this.time;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private KafkaController controller() {
        return this.controller;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private MetadataCache metadataCache() {
        return this.metadataCache;
    }

    private ClusterLinkManager clusterLinkManager() {
        return this.clusterLinkManager;
    }

    private void clusterLinkManager_$eq(ClusterLinkManager clusterLinkManager) {
        this.clusterLinkManager = clusterLinkManager;
    }

    @BeforeEach
    public void setUp() {
        EasyMock.expect(replicaManager().metadataCache()).andReturn(metadataCache()).anyTimes();
        EasyMock.expect(replicaManager().zkClient()).andReturn(new Some(zkClient())).anyTimes();
        EasyMock.replay(new Object[]{replicaManager()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(controller().isActive())).andReturn(BoxesRunTime.boxToBoolean(true)).anyTimes();
        EasyMock.replay(new Object[]{controller()});
        clusterLinkManager_$eq(createClusterLinkManager(brokerConfig()));
    }

    @AfterEach
    public void tearDown() {
        clusterLinkManager().shutdown();
        metrics().close();
    }

    @Test
    public void testClusterLinks() {
        String str = "testLink";
        UUID randomUUID = UUID.randomUUID();
        String str2 = "testClusterId";
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        Partition partition = (Partition) EasyMock.createNiceMock(Partition.class);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUUID));
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        Assertions$.MODULE$.intercept(() -> {
            return this.clusterLinkManager().resolveLinkIdOrThrow(str);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 68));
        clusterLinkManager().ensureLinkNameDoesntExist("testLink");
        Assertions.assertEquals(Nil$.MODULE$, clusterLinkManager().listClusterLinks());
        setupMock(partition, topicPartition, None$.MODULE$);
        clusterLinkManager().addPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        setupMock(partition, topicPartition, new Some(randomUUID));
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().addPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 78));
        setupMock(partition, topicPartition, new Some(randomUUID));
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        Assertions.assertNotEquals(None$.MODULE$, clusterLinkManager().clientManager(randomUUID));
        Assertions.assertEquals(new Some(randomUUID), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(new $colon.colon(clusterLinkData, Nil$.MODULE$), clusterLinkManager().listClusterLinks());
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) clusterLinkManager().fetcherManager(randomUUID).get();
        ClusterLinkClientManager clusterLinkClientManager = (ClusterLinkClientManager) clusterLinkManager().clientManager(randomUUID).get();
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(new ClusterLinkData(str, UUID.randomUUID(), new Some(str2), None$.MODULE$, false), this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(ClusterLinkExistsException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 93));
        clusterLinkManager().addPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not added to metadata");
        Assertions.assertTrue(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic not added to client manager");
        Assertions.assertFalse(clusterLinkFetcherManager.isEmpty(), "Fetcher not recording active topic");
        LeaderAndIsrRequestData.LeaderAndIsrPartitionState leaderAndIsrPartitionState = (LeaderAndIsrRequestData.LeaderAndIsrPartitionState) EasyMock.mock(LeaderAndIsrRequestData.LeaderAndIsrPartitionState.class);
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn(randomUUID.toString()).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkTopicState()).andReturn("Mirror").anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderAndIsrPartitionState.linkedLeaderEpoch())).andReturn(BoxesRunTime.boxToInteger(1)).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        clusterLinkManager().removePartitions(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic not removed from client manager");
        EasyMock.reset(new Object[]{leaderAndIsrPartitionState});
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        clusterLinkManager().removePartitions(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager");
        EasyMock.reset(new Object[]{leaderAndIsrPartitionState});
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkId()).andReturn(randomUUID.toString()).anyTimes();
        EasyMock.expect(leaderAndIsrPartitionState.clusterLinkTopicState()).andReturn("FailedMirror").anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderAndIsrPartitionState.linkedLeaderEpoch())).andReturn(BoxesRunTime.boxToInteger(-1)).anyTimes();
        EasyMock.replay(new Object[]{leaderAndIsrPartitionState});
        clusterLinkManager().addPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition})));
        clusterLinkManager().removePartitions(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(partition), leaderAndIsrPartitionState)})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata for failed mirror");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in client manager for failed mirror");
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        Partition partition2 = (Partition) EasyMock.createNiceMock(Partition.class);
        setupMock(partition2, topicPartition2, new Some(randomUUID));
        clusterLinkManager().addPartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new Partition[]{partition2})));
        Assertions.assertTrue(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not added to metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be added to client manager");
        clusterLinkManager().removePartitionsAndMetadata(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition2})));
        Assertions.assertFalse(clusterLinkFetcherManager.currentMetadata().retainTopic("testTopic", false, time().milliseconds()), "Topic not removed from metadata");
        Assertions.assertFalse(clusterLinkClientManager.getTopics().contains("testTopic"), "Topic should not be in to client manager");
        Object obj = clusterLinkManager().fetcherManager(randomUUID).get();
        Assertions.assertTrue(obj != null && obj.equals(clusterLinkFetcherManager), "Unexpected fetcher manager");
        Object obj2 = clusterLinkManager().clientManager(randomUUID).get();
        Assertions.assertTrue(obj2 != null && obj2.equals(clusterLinkClientManager), "Unexpected client manager");
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(2);
        zkClient().setClusterLink(new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, true));
        EasyMock.expect(BoxedUnit.UNIT);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().deleteClusterLink("testLink", randomUUID);
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().resolveLinkId("testLink"));
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().deleteClusterLink(str, randomUUID);
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 162));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testClusterLinks$5(this, randomUUID)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testClusterLinks$6(randomUUID));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    @Test
    public void testReconfigure() {
        String str = "testLink";
        UUID randomUUID = UUID.randomUUID();
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().updateClusterLinkConfig(str, properties -> {
                return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$2(properties));
            });
        }, ClassTag$.MODULE$.apply(ClusterLinkNotFoundException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 176));
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.expect(zkClient().getClusterLinks(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false))}))).anyTimes();
        EasyMock.replay(new Object[]{zkClient()});
        Assertions.assertEquals(None$.MODULE$, clusterLinkManager().fetcherManager(randomUUID));
        clusterLinkManager().createClusterLink(new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false), clusterLinkConfig(), clusterLinkPersistentProps());
        ClusterLinkFactory.FetcherManager fetcherManager = (ClusterLinkFactory.FetcherManager) clusterLinkManager().fetcherManager(randomUUID).get();
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:5678");
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(1);
        EasyMock.expect(zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), randomUUID.toString())).andReturn(properties).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().updateClusterLinkConfig("testLink", properties2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$3(properties2));
        });
        Assertions.assertEquals(Collections.singletonList("localhost:5678"), fetcherManager.currentConfig().bootstrapServers());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(true)).times(1);
        EasyMock.expect(zkClient().getEntityConfigs(ConfigType$.MODULE$.ClusterLink(), randomUUID.toString())).andReturn(properties).times(1);
        EasyMock.expect(zkClient().getClusterLinks(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, None$.MODULE$, None$.MODULE$, false))}))).times(1);
        Capture newCapture = EasyMock.newCapture();
        zkClient().setOrCreateEntityConfigs((String) EasyMock.eq(ConfigType$.MODULE$.ClusterLink()), (String) EasyMock.eq(randomUUID.toString()), (Properties) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().updateClusterLinkConfig("testLink", properties3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReconfigure$4(properties3));
        });
        clusterLinkManager().processClusterLinkChanges(randomUUID, (Properties) newCapture.getValue());
        Assertions.assertEquals(Collections.singletonList("localhost:1234"), fetcherManager.currentConfig().bootstrapServers());
    }

    @Test
    public void testFailedAddClusterLink() {
        UUID randomUUID = UUID.randomUUID();
        ClusterLinkData clusterLinkData = new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false);
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        zkClient().createClusterLink(clusterLinkData);
        EasyMock.expect(BoxedUnit.UNIT).andThrow(new RuntimeException("")).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        Assertions$.MODULE$.intercept(() -> {
            this.clusterLinkManager().createClusterLink(clusterLinkData, this.clusterLinkConfig(), this.clusterLinkPersistentProps());
        }, ClassTag$.MODULE$.apply(RuntimeException.class), new Position("ClusterLinkManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 236));
        Assertions.assertTrue(clusterLinkManager().resolveLinkId("testLink").isEmpty());
        Assertions.assertTrue(clusterLinkManager().listClusterLinks().isEmpty());
        EasyMock.reset(new Object[]{zkClient()});
        EasyMock.expect(BoxesRunTime.boxToBoolean(zkClient().clusterLinkExists(randomUUID))).andReturn(BoxesRunTime.boxToBoolean(false)).times(1);
        EasyMock.replay(new Object[]{zkClient()});
        clusterLinkManager().createClusterLink(clusterLinkData, clusterLinkConfig(), clusterLinkPersistentProps());
        Assertions.assertEquals(new Some(randomUUID), clusterLinkManager().resolveLinkId("testLink"));
        Assertions.assertEquals(new $colon.colon(clusterLinkData, Nil$.MODULE$), clusterLinkManager().listClusterLinks());
    }

    @Test
    public void testReconfigureFailure() {
        UUID randomUUID = UUID.randomUUID();
        EasyMock.expect(zkClient().getClusterLinks(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new UUID[]{randomUUID})))).andReturn(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(randomUUID), new ClusterLinkData("testLink", randomUUID, new Some("testClusterId"), None$.MODULE$, false))}))).once();
        EasyMock.replay(new Object[]{zkClient()});
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "");
        clusterLinkManager().processClusterLinkChanges(randomUUID, properties);
        Assertions.assertEquals(FailedClusterLink$.MODULE$, clusterLinkManager().linkState("testLink"));
    }

    private KafkaConfig createBrokerConfig() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        createBrokerConfig.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    private Properties clusterLinkPersistentProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:1234");
        return properties;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return ClusterLinkConfig$.MODULE$.create(clusterLinkPersistentProps());
    }

    private void setupMock(Partition partition, TopicPartition topicPartition, Option<UUID> option) {
        EasyMock.reset(new Object[]{partition});
        EasyMock.expect(partition.topicPartition()).andReturn(topicPartition).anyTimes();
        EasyMock.expect(partition.getClusterLinkId()).andReturn(option).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(partition.isActiveLinkDestinationLeader())).andReturn(BoxesRunTime.boxToBoolean(option.nonEmpty())).anyTimes();
        EasyMock.expect(partition.getLinkedLeaderEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(1))).anyTimes();
        EasyMock.replay(new Object[]{partition});
    }

    private ClusterLinkManager createClusterLinkManager(KafkaConfig kafkaConfig) {
        ClusterLinkManager createLinkManager = ClusterLinkFactory$.MODULE$.createLinkManager(kafkaConfig, "clusterId", QuotaFactory$UnboundedQuota$.MODULE$, zkClient(), metrics(), time(), ClusterLinkFactory$.MODULE$.createLinkManager$default$7(), None$.MODULE$);
        Endpoint endpoint = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 1234);
        AuthorizerServerInfo authorizerServerInfo = (AuthorizerServerInfo) EasyMock.mock(AuthorizerServerInfo.class);
        EasyMock.expect(authorizerServerInfo.interBrokerEndpoint()).andReturn(endpoint).anyTimes();
        EasyMock.replay(new Object[]{authorizerServerInfo});
        createLinkManager.startup(authorizerServerInfo, (ReplicaManager) null, (ZkAdminManager) null, controller(), (SocketServer) null, None$.MODULE$);
        return createLinkManager;
    }

    public static final /* synthetic */ boolean $anonfun$testClusterLinks$5(ClusterLinkManagerTest clusterLinkManagerTest, UUID uuid) {
        return clusterLinkManagerTest.clusterLinkManager().fetcherManager(uuid).isEmpty() && clusterLinkManagerTest.clusterLinkManager().clientManager(uuid).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testClusterLinks$6(UUID uuid) {
        return new StringBuilder(38).append("Linked fetcher/client for ").append(uuid).append(" not removed").toString();
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$2(Properties properties) {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$3(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$testReconfigure$4(Properties properties) {
        properties.put("bootstrap.servers", "localhost:1234");
        return true;
    }
}
