package kafka.server.link;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.server.authorizer.Authorizer;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Set;
import scala.collection.StringOps$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkClientManagerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0005d\u0001B\r\u001b\u0001\u0005BQ\u0001\u000b\u0001\u0005\u0002%Bq\u0001\f\u0001C\u0002\u0013\u0005Q\u0006\u0003\u00042\u0001\u0001\u0006IA\f\u0005\be\u0001\u0011\r\u0011\"\u00014\u0011\u0019Q\u0004\u0001)A\u0005i!I1\b\u0001a\u0001\u0002\u0004%\t\u0001\u0010\u0005\n\u0001\u0002\u0001\r\u00111A\u0005\u0002\u0005C\u0011b\u0012\u0001A\u0002\u0003\u0005\u000b\u0015B\u001f\t\u000f!\u0003!\u0019!C\u0001\u0013\"1Q\u000b\u0001Q\u0001\n)CqA\u0016\u0001C\u0002\u0013\u0005q\u000b\u0003\u0004^\u0001\u0001\u0006I\u0001\u0017\u0005\b=\u0002\u0011\r\u0011\"\u0001`\u0011\u0019A\u0007\u0001)A\u0005A\"9\u0011\u000e\u0001b\u0001\n\u0003Q\u0007B\u00028\u0001A\u0003%1\u000eC\u0003p\u0001\u0011\u0005\u0001\u000fC\u0003}\u0001\u0011\u0005\u0001\u000fC\u0003\u007f\u0001\u0011\u0005\u0001\u000f\u0003\u0004\u0002\u0002\u0001!\t\u0001\u001d\u0005\u0007\u0003\u000b\u0001A\u0011\u00019\t\u000f\u0005%\u0001\u0001\"\u0003\u0002\f!9\u00111\n\u0001\u0005\n\u00055\u0003bBA*\u0001\u0011%\u0011Q\u000b\u0002\u001d\u00072,8\u000f^3s\u0019&t7n\u00117jK:$X*\u00198bO\u0016\u0014H+Z:u\u0015\tYB$\u0001\u0003mS:\\'BA\u000f\u001f\u0003\u0019\u0019XM\u001d<fe*\tq$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\u0011\u0003CA\u0012'\u001b\u0005!#\"A\u0013\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u001d\"#AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002UA\u00111\u0006A\u0007\u00025\u0005I1o\u00195fIVdWM]\u000b\u0002]A\u00111fL\u0005\u0003ai\u0011Ac\u00117vgR,'\u000fT5oWN\u001b\u0007.\u001a3vY\u0016\u0014\u0018AC:dQ\u0016$W\u000f\\3sA\u0005A!p[\"mS\u0016tG/F\u00015!\t)\u0004(D\u00017\u0015\t9d$\u0001\u0002{W&\u0011\u0011H\u000e\u0002\u000e\u0017\u000647.\u0019.l\u00072LWM\u001c;\u0002\u0013i\\7\t\\5f]R\u0004\u0013!D2mS\u0016tG/T1oC\u001e,'/F\u0001>!\tYc(\u0003\u0002@5\tA2\t\\;ti\u0016\u0014H*\u001b8l\u00072LWM\u001c;NC:\fw-\u001a:\u0002#\rd\u0017.\u001a8u\u001b\u0006t\u0017mZ3s?\u0012*\u0017\u000f\u0006\u0002C\u000bB\u00111eQ\u0005\u0003\t\u0012\u0012A!\u00168ji\"9aiBA\u0001\u0002\u0004i\u0014a\u0001=%c\u0005q1\r\\5f]Rl\u0015M\\1hKJ\u0004\u0013AC1vi\"|'/\u001b>feV\t!\n\u0005\u0002L'6\tAJ\u0003\u0002I\u001b*\u0011QD\u0014\u0006\u0003?=S!\u0001U)\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0016aA8sO&\u0011A\u000b\u0014\u0002\u000b\u0003V$\bn\u001c:ju\u0016\u0014\u0018aC1vi\"|'/\u001b>fe\u0002\n!bY8oiJ|G\u000e\\3s+\u0005A\u0006CA-\\\u001b\u0005Q&B\u0001,\u001f\u0013\ta&LA\bLC\u001a\\\u0017mQ8oiJ|G\u000e\\3s\u0003-\u0019wN\u001c;s_2dWM\u001d\u0011\u0002\u0013\u0011,7\u000f^!e[&tW#\u00011\u0011\u0005\u00054W\"\u00012\u000b\u0005\r$\u0017!B1e[&t'BA3O\u0003\u001d\u0019G.[3oiNL!a\u001a2\u0003\u000b\u0005#W.\u001b8\u0002\u0015\u0011,7\u000f^!e[&t\u0007%A\u0004nKR\u0014\u0018nY:\u0016\u0003-\u0004\"a\u000b7\n\u00055T\"AE\"mkN$XM\u001d'j].lU\r\u001e:jGN\f\u0001\"\\3ue&\u001c7\u000fI\u0001\u0010i\u0016\u001cHOU3d_:4\u0017nZ;sKR\t!\t\u000b\u0002\u0012eB\u00111O_\u0007\u0002i*\u0011QO^\u0001\u0004CBL'BA<y\u0003\u001dQW\u000f]5uKJT!!_)\u0002\u000b),h.\u001b;\n\u0005m$(\u0001\u0002+fgR\f!\u0002^3tiR{\u0007/[2tQ\t\u0011\"/\u0001\fuKN$\u0018i\u00197Ts:\u001cG+Y:l'R\f'\u000f^;qQ\t\u0019\"/\u0001\u0014uKN$\u0018i\u00197Ts:\u001cG+Y:l'R\f'\u000f^;q/&$\bNT8BkRDwN]5{KJD#\u0001\u0006:\u00021Q,7\u000f^!dYNKhn\u0019+bg.tun\u0015;beR,\b\u000f\u000b\u0002\u0016e\u0006\u0001b.Z<DY&,g\u000e^'b]\u0006<WM\u001d\u000b\f{\u00055\u0011qEA\u0019\u0003\u0003\nI\u0005C\u0004\u0002\u0010Y\u0001\r!!\u0005\u0002\u00111Lgn\u001b(b[\u0016\u0004B!a\u0005\u0002\"9!\u0011QCA\u000f!\r\t9\u0002J\u0007\u0003\u00033Q1!a\u0007!\u0003\u0019a$o\\8u}%\u0019\u0011q\u0004\u0013\u0002\rA\u0013X\rZ3g\u0013\u0011\t\u0019#!\n\u0003\rM#(/\u001b8h\u0015\r\ty\u0002\n\u0005\b\u0003S1\u0002\u0019AA\u0016\u0003\u0019\u0019wN\u001c4jOB\u00191&!\f\n\u0007\u0005=\"DA\tDYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001eDq!a\r\u0017\u0001\u0004\t)$\u0001\u0007bI6LgNR1di>\u0014\u0018\u0010E\u0004$\u0003o\tY#a\u000f\n\u0007\u0005eBEA\u0005Gk:\u001cG/[8ocA\u00191&!\u0010\n\u0007\u0005}\"D\u0001\fDYV\u001cH/\u001a:MS:\\\u0017\tZ7j]\u000ec\u0017.\u001a8u\u0011\u0019Ae\u00031\u0001\u0002DA!1%!\u0012K\u0013\r\t9\u0005\n\u0002\u0007\u001fB$\u0018n\u001c8\t\u000bY3\u0002\u0019\u0001-\u0002\u001b\rdwn]3NC:\fw-\u001a:t)\r\u0011\u0015q\n\u0005\u0007\u0003#:\u0002\u0019A\u001f\u0002#\rd\u0017.\u001a8u\u0019&t7.T1oC\u001e,'/A\u0005oK^\u001cuN\u001c4jOR!\u00111FA,\u0011\u001d\tI\u0006\u0007a\u0001\u00037\nqaY8oM&<7\u000f\u0005\u0005\u0002\u0014\u0005u\u0013\u0011CA\t\u0013\u0011\ty&!\n\u0003\u00075\u000b\u0007\u000f")
/* loaded from: input_file:kafka/server/link/ClusterLinkClientManagerTest.class */
public class ClusterLinkClientManagerTest {
    private ClusterLinkClientManager clientManager;
    private final ClusterLinkScheduler scheduler = (ClusterLinkScheduler) EasyMock.createNiceMock(ClusterLinkScheduler.class);
    private final KafkaZkClient zkClient = (KafkaZkClient) EasyMock.createNiceMock(KafkaZkClient.class);
    private final Authorizer authorizer = (Authorizer) EasyMock.createNiceMock(Authorizer.class);
    private final KafkaController controller = (KafkaController) EasyMock.createNiceMock(KafkaController.class);
    private final Admin destAdmin = (Admin) EasyMock.createNiceMock(Admin.class);
    private final ClusterLinkMetrics metrics = (ClusterLinkMetrics) EasyMock.createNiceMock(ClusterLinkMetrics.class);

    public ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    public KafkaZkClient zkClient() {
        return this.zkClient;
    }

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    public void clientManager_$eq(ClusterLinkClientManager clusterLinkClientManager) {
        this.clientManager = clusterLinkClientManager;
    }

    public Authorizer authorizer() {
        return this.authorizer;
    }

    public KafkaController controller() {
        return this.controller;
    }

    public Admin destAdmin() {
        return this.destAdmin;
    }

    public ClusterLinkMetrics metrics() {
        return this.metrics;
    }

    @Test
    public void testReconfigure() {
        IntRef create = IntRef.create(0);
        ObjectRef create2 = ObjectRef.create(newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:1234")}))));
        ObjectRef create3 = ObjectRef.create((KafkaAdminClient) EasyMock.createNiceMock(KafkaAdminClient.class));
        ClusterLinkConfig clusterLinkConfig = (ClusterLinkConfig) create2.elem;
        Function1 function1 = clusterLinkConfig2 -> {
            return adminFactory$1(clusterLinkConfig2, create, create2, create3);
        };
        Some some = new Some(authorizer());
        KafkaController controller = controller();
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData("test-link", UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        ClusterLinkClientManager clusterLinkClientManager = new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), clusterLinkConfig, some, controller, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
        Assertions.assertEquals(0, create.elem);
        clusterLinkClientManager.startup();
        try {
            Assertions.assertEquals(1, create.elem);
            Assertions.assertTrue(((KafkaAdminClient) create3.elem) == clusterLinkClientManager.getAdmin());
            create3.elem = (KafkaAdminClient) EasyMock.createNiceMock(KafkaAdminClient.class);
            create2.elem = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:2345")})));
            clusterLinkClientManager.reconfigure((ClusterLinkConfig) create2.elem, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"bootstrap.servers"})));
            Assertions.assertEquals(2, create.elem);
            Assertions.assertTrue(((KafkaAdminClient) create3.elem) == clusterLinkClientManager.getAdmin());
            create2.elem = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:2345"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()), "5")})));
            clusterLinkClientManager.reconfigure((ClusterLinkConfig) create2.elem, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp()})));
            Assertions.assertEquals(2, create.elem);
            create2.elem = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:3456"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
            clusterLinkClientManager.reconfigure((ClusterLinkConfig) create2.elem, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.NumClusterLinkFetchersProp(), "bootstrap.servers", ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
            Assertions.assertEquals(2, create.elem);
            Assertions$.MODULE$.intercept(() -> {
                return clusterLinkClientManager.getAdmin();
            }, ClassTag$.MODULE$.apply(ClusterLinkPausedException.class), new Position("ClusterLinkClientManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
            create2.elem = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:4567"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})));
            clusterLinkClientManager.reconfigure((ClusterLinkConfig) create2.elem, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"bootstrap.servers"})));
            Assertions.assertEquals(2, create.elem);
            Assertions$.MODULE$.intercept(() -> {
                return clusterLinkClientManager.getAdmin();
            }, ClassTag$.MODULE$.apply(ClusterLinkPausedException.class), new Position("ClusterLinkClientManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 85));
            create3.elem = (KafkaAdminClient) EasyMock.createNiceMock(KafkaAdminClient.class);
            create2.elem = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:4567")})));
            clusterLinkClientManager.reconfigure((ClusterLinkConfig) create2.elem, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()})));
            Assertions.assertEquals(3, create.elem);
            Assertions.assertTrue(((KafkaAdminClient) create3.elem) == clusterLinkClientManager.getAdmin());
            closeManagers(clusterLinkClientManager);
            Assertions.assertEquals(3, create.elem);
            Assertions$.MODULE$.intercept(() -> {
                return clusterLinkClientManager.getAdmin();
            }, ClassTag$.MODULE$.apply(IllegalStateException.class), new Position("ClusterLinkClientManagerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 102));
        } catch (Throwable th) {
            closeManagers(clusterLinkClientManager);
            throw th;
        }
    }

    @Test
    public void testTopics() {
        ClusterLinkConfig newConfig = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:1234")})));
        Function1 function1 = clusterLinkConfig -> {
            return adminFactory$2(clusterLinkConfig);
        };
        Some some = new Some(authorizer());
        KafkaController controller = controller();
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData("test-link", UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        ClusterLinkClientManager clusterLinkClientManager = new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), newConfig, some, controller, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
        List list = (List) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"topic0", "topic1", "topic2"}));
        clusterLinkClientManager.startup();
        try {
            Assertions.assertEquals(Predef$.MODULE$.Set().empty(), clusterLinkClientManager.getTopics());
            clusterLinkClientManager.addTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(0), (String) list.apply(1)})));
            Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(0), (String) list.apply(1)})), clusterLinkClientManager.getTopics());
            clusterLinkClientManager.addTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(1), (String) list.apply(2)})));
            Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(0), (String) list.apply(1), (String) list.apply(2)})), clusterLinkClientManager.getTopics());
            clusterLinkClientManager.removeTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(2), (String) list.apply(0)})));
            Assertions.assertEquals(Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(1)})), clusterLinkClientManager.getTopics());
            clusterLinkClientManager.removeTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{(String) list.apply(1), "unknown"})));
            Assertions.assertEquals(Predef$.MODULE$.Set().empty(), clusterLinkClientManager.getTopics());
        } finally {
            closeManagers(clusterLinkClientManager);
        }
    }

    @Test
    public void testAclSyncTaskStartup() {
        ClusterLinkConfig newConfig = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:1234"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclFiltersProp()), StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("{\n        | \"aclFilters\": [{\n        |  \"resourceFilter\": {\n        |      \"resourceType\": \"any\",\n        |      \"patternType\": \"any\"\n        |    },\n        |  \"accessFilter\": {\n        |     \"operation\": \"any\",\n        |     \"permissionType\": \"any\"\n        |    }\n        |  }]\n        | }")))})));
        Function1 function1 = clusterLinkConfig -> {
            return adminFactory$3(clusterLinkConfig);
        };
        Some some = new Some(authorizer());
        KafkaController controller = controller();
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData("test-link", UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        ClusterLinkClientManager clusterLinkClientManager = new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), newConfig, some, controller, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
        clusterLinkClientManager.startup();
        try {
            Predef$.MODULE$.assert(clusterLinkClientManager.getSyncAclTask().isDefined());
        } finally {
            closeManagers(clusterLinkClientManager);
        }
    }

    @Test
    public void testAclSyncTaskStartupWithNoAuthorizer() {
        ClusterLinkConfig newConfig = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:1234"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "true"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclFiltersProp()), StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString("{\n        | \"aclFilters\": [{\n        |  \"resourceFilter\": {\n        |      \"resourceType\": \"any\",\n        |      \"patternType\": \"any\"\n        |    },\n        |  \"accessFilter\": {\n        |     \"operation\": \"any\",\n        |     \"permissionType\": \"any\"\n        |    }\n        |  }]\n        | }")))})));
        Function1 function1 = clusterLinkConfig -> {
            return adminFactory$4(clusterLinkConfig);
        };
        None$ none$ = None$.MODULE$;
        KafkaController controller = controller();
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData("test-link", UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$5 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        ClusterLinkClientManager clusterLinkClientManager = new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$2, none$3, none$4, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$5, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), newConfig, none$, controller, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
        try {
            try {
                clusterLinkClientManager = clusterLinkClientManager;
                clusterLinkClientManager.startup();
            } catch (IllegalArgumentException unused) {
                Assertions.assertEquals(clusterLinkClientManager.getMessage(), "ACL migration is enabled but authorizer.class.name is not set. Please set authorizer.class.name to proceed with ACL migration.");
            }
        } finally {
            closeManagers(clusterLinkClientManager);
        }
    }

    @Test
    public void testAclSyncTaskNoStartup() {
        ClusterLinkConfig newConfig = newConfig((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), "localhost:1234"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), "false")})));
        Function1 function1 = clusterLinkConfig -> {
            return adminFactory$5(clusterLinkConfig);
        };
        Some some = new Some(authorizer());
        KafkaController controller = controller();
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData("test-link", UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        ClusterLinkClientManager clusterLinkClientManager = new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), newConfig, some, controller, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
        clusterLinkClientManager.startup();
        try {
            Predef$.MODULE$.assert(clusterLinkClientManager.getSyncAclTask().isEmpty());
        } finally {
            closeManagers(clusterLinkClientManager);
        }
    }

    private ClusterLinkClientManager newClientManager(String str, ClusterLinkConfig clusterLinkConfig, Function1<ClusterLinkConfig, ClusterLinkAdminClient> function1, Option<Authorizer> option, KafkaController kafkaController) {
        EasyMock.expect(scheduler().schedule(EasyMock.anyString(), (Function0) EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyLong(), (TimeUnit) EasyMock.anyObject())).andReturn((Object) null).anyTimes();
        EasyMock.replay(new Object[]{scheduler()});
        ClusterLinkData clusterLinkData = new ClusterLinkData(str, UUID.randomUUID(), None$.MODULE$, None$.MODULE$, false);
        ClusterLinkFetcherManager clusterLinkFetcherManager = (ClusterLinkFetcherManager) EasyMock.createNiceMock(ClusterLinkFetcherManager.class);
        ClusterLinkManager createClusterLinkManager = ClusterLinkTestUtils$.MODULE$.createClusterLinkManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        return new ClusterLinkClientManager(createClusterLinkManager, new KafkaConfig(testUtils$.createBrokerConfig(1, "localhost:1234", true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1)), clusterLinkData, clusterLinkFetcherManager, scheduler(), zkClient(), clusterLinkConfig, option, kafkaController, None$.MODULE$, metrics(), function1, () -> {
            return this.destAdmin();
        });
    }

    private void closeManagers(ClusterLinkClientManager clusterLinkClientManager) {
        clusterLinkClientManager.shutdown();
        clusterLinkClientManager.clusterLinkManager().shutdown();
        clusterLinkClientManager.scheduler().shutdown();
    }

    private ClusterLinkConfig newConfig(Map<String, String> map) {
        Properties properties = new Properties();
        Implicits$ implicits$ = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties).$plus$plus$eq(map);
        return ClusterLinkConfig$.MODULE$.create(properties);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ClusterLinkAdminClient adminFactory$1(ClusterLinkConfig clusterLinkConfig, IntRef intRef, ObjectRef objectRef, ObjectRef objectRef2) {
        intRef.elem++;
        Assertions.assertTrue(((ClusterLinkConfig) objectRef.elem) == clusterLinkConfig);
        return new ClusterLinkAdminClient((KafkaAdminClient) objectRef2.elem, (AdminMetadataManager) null, (NetworkClient) null, "");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ClusterLinkAdminClient adminFactory$2(ClusterLinkConfig clusterLinkConfig) {
        return (ClusterLinkAdminClient) EasyMock.createNiceMock(ClusterLinkAdminClient.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ClusterLinkAdminClient adminFactory$3(ClusterLinkConfig clusterLinkConfig) {
        return (ClusterLinkAdminClient) EasyMock.createNiceMock(ClusterLinkAdminClient.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ClusterLinkAdminClient adminFactory$4(ClusterLinkConfig clusterLinkConfig) {
        return (ClusterLinkAdminClient) EasyMock.createNiceMock(ClusterLinkAdminClient.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ClusterLinkAdminClient adminFactory$5(ClusterLinkConfig clusterLinkConfig) {
        return (ClusterLinkAdminClient) EasyMock.createNiceMock(ClusterLinkAdminClient.class);
    }
}
