/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.link.integration;

import com.google.common.collect.Lists;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.IntegrationTestHarness;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.security.auth.plain.DynamicPlainLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthUtils;
import io.confluent.kafka.server.plugins.policy.AlterConfigPolicy;
import io.confluent.kafka.server.plugins.policy.ClusterLinkPolicyConfig;
import io.confluent.kafka.server.plugins.policy.CreateClusterLinkPolicy;
import io.confluent.kafka.server.plugins.policy.CreateTopicPolicy;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.security.auth.login.LoginException;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkCCloudToCCloudChannelBuilder;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkDestClientManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.MirrorTopicConfigSyncRules;
import kafka.utils.TestInfoUtils;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateClusterLinksOptions;
import org.apache.kafka.clients.admin.CreateClusterLinksResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewClusterLink;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.interceptor.ConfluentCloudBrokerInterceptor;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Option;

@Tag(value="integration")
public class MultiTenantClusterLinkTest {
    private static final String SYNC_ALL_ACL_FILTER = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}";
    public static final String SSL_KAFKA_CN = "kafka";
    private MultiTenantCluster sourceCluster;
    private MultiTenantCluster destCluster;
    private final String linkName = "tenantLink";
    private final String topic = "linkedTopic";
    private int numPartitions = 2;
    private int nextMessageIndex = 0;
    private TestInfo testInfo;
    private final String localAclsFilter = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\" }}]}";
    private final String subsetAclsFilter = "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"topic\", \"name\": \"app1-topic\", \"patternType\": \"literal\" },\"accessFilter\": { \"principal\": \"User:app1-developer\", \"operation\": \"write\", \"host\": \"*\", \"permissionType\": \"allow\", \"clusterLinkIds\" : [\"" + Uuid.ZERO_UUID + "\"]  }}]}";
    private static final String DEVELOPER1 = "app1-developer";
    private static final String APP1_TOPIC = "app1-topic";
    private static final String APP1_CONSUMER_GROUP = "app1-consumer-group";
    private final KafkaPrincipal principal = new KafkaPrincipal("User", "app1-developer");
    private final Collection<AclBinding> allExpectedAcls = Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app1-topic", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app1-topic", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "app1-consumer-group", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
    private final Collection<AclBinding> localExpectedAcls = Arrays.asList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app1-topic", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)), new AclBinding(new ResourcePattern(ResourceType.GROUP, "app1-consumer-group", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
    private final Collection<AclBinding> subsetExpectedAcls = Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app1-topic", PatternType.LITERAL), new AccessControlEntry(this.principal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));

    public static Stream allCombinations() {
        return Stream.of(Arguments.of((Object[])new Object[]{"zk", "true"}), Arguments.of((Object[])new Object[]{"zk", "false"}), Arguments.of((Object[])new Object[]{"kraft", "true"}));
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        this.sourceCluster = new MultiTenantCluster(testInfo);
        this.destCluster = new MultiTenantCluster(testInfo);
        this.testInfo = testInfo;
    }

    @AfterEach
    public void tearDown() throws Exception {
        try {
            this.sourceCluster.shutdown();
            this.destCluster.shutdown();
        }
        finally {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
        }
    }

    private void setUpClusters(boolean useSourceInitiatedLink, boolean useSslForSourceListener, boolean isSourceConfluentCloud) throws Exception {
        this.setUpClusters(useSourceInitiatedLink, useSslForSourceListener, isSourceConfluentCloud, 1000);
    }

    private void setUpClusters(boolean useSourceInitiatedLink, boolean useSslForSourceListener, boolean isSourceConfluentCloud, int destAclLimit) throws Exception {
        HashMap<String, String> destOverrides = new HashMap<String, String>();
        destOverrides.put("confluent.max.acls.per.tenant", String.valueOf(destAclLimit));
        this.setUpClusters(useSourceInitiatedLink, useSslForSourceListener, isSourceConfluentCloud, Collections.emptyMap(), destOverrides);
    }

    private void setUpClusters(boolean useSourceInitiatedLink, boolean useSslForSourceListener, boolean isSourceConfluentCloud, Map<String, String> sourceBrokerAndControllerOverrides, Map<String, String> destBrokerAndControllerOverrides) throws Exception {
        this.setupSourceCluster(useSourceInitiatedLink, useSslForSourceListener, isSourceConfluentCloud, sourceBrokerAndControllerOverrides, Optional.empty());
        this.setupDestCluster(useSourceInitiatedLink, destBrokerAndControllerOverrides);
    }

    private void setupSourceCluster(boolean useSourceInitiatedLink, boolean useSslForSourceListener, boolean isSourceConfluentCloud, Map<String, String> brokerAndControllerOverrides, Optional<String> clientListener) throws Exception {
        this.setupSourceCluster(useSourceInitiatedLink, useSslForSourceListener, isSourceConfluentCloud, brokerAndControllerOverrides, brokerAndControllerOverrides, clientListener);
    }

    private void setupSourceCluster(boolean useSourceInitiatedLink, boolean useSslForSourceListener, boolean isSourceConfluentCloud, Map<String, String> sourceBrokerOverrides, Map<String, String> sourceControllerOverrides, Optional<String> clientListener) throws Exception {
        this.sourceCluster.useSourceInitiatedLink = useSourceInitiatedLink;
        Properties sourceBrokerProps = this.brokerProps(isSourceConfluentCloud);
        sourceBrokerProps.putAll(sourceBrokerOverrides);
        Properties sourceControllerProps = this.brokerProps(isSourceConfluentCloud);
        sourceControllerProps.putAll(sourceControllerOverrides);
        this.sourceCluster.startCluster(sourceBrokerProps, sourceControllerProps, "sourceLogicalCluster", 1, useSslForSourceListener, clientListener);
        this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
    }

    private void setupDestCluster(boolean useSourceInitiatedLink, Map<String, String> destBrokerOverrides) throws Exception {
        this.destCluster.useSourceInitiatedLink = useSourceInitiatedLink;
        Properties destProps = this.brokerProps();
        destProps.putAll(destBrokerOverrides);
        this.destCluster.startCluster(destProps, "destLogicalCluster", 11, false);
        this.addAcls((Admin)this.destCluster.admin, this.destCluster.user, new String[0]);
    }

    private boolean testRunsWithLinkCoordinator() {
        return this.testInfo.getDisplayName().contains("coordinator=true");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkAddSourcePartitionsAndChangeSourceConfig(String quorum, boolean coordinator) throws Throwable {
        this.testBasicEndToEndClusterLinking("", false, true, true);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkSyncAcls(String quorum, boolean coordinator) throws Throwable {
        this.testBasicEndToEndClusterLinking("", true, false, false);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testBidirectionalClusterLink(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, false);
        String clusterLinkPrefix1 = "west_";
        String clusterLinkPrefix2 = "east_";
        this.addAcls((Admin)this.destCluster.admin, this.destCluster.user, clusterLinkPrefix1);
        this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user, clusterLinkPrefix2);
        HashMap<String, String> linkConfigs = new HashMap<String, String>();
        linkConfigs.put(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix1);
        linkConfigs.put(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, linkConfigs, false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "First link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        linkConfigs.put(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix2);
        this.verifyMismatchedLinkIdFailure(linkConfigs);
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, "EXTERNAL", false, linkConfigs, true, "", linkId);
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists("tenantLink"), true, "Second link was not created");
        Assertions.assertEquals((Object)linkId, (Object)this.sourceCluster.linkId("tenantLink"));
        this.createTopic(this.sourceCluster);
        this.createTopic(this.destCluster);
        this.createMirrorTopic(this.destCluster.admin, clusterLinkPrefix1);
        this.createMirrorTopic(this.sourceCluster.admin, clusterLinkPrefix2);
        this.verifyTopicListing(clusterLinkPrefix1, this.destCluster);
        this.verifyTopicListing(clusterLinkPrefix2, this.sourceCluster);
        this.verifyTopicMirroring(clusterLinkPrefix1, this.sourceCluster, this.destCluster);
        this.verifyTopicMirroring(clusterLinkPrefix2, this.destCluster, this.sourceCluster);
        this.stopMirroring(this.destCluster, clusterLinkPrefix1 + "linkedTopic");
        this.stopMirroring(this.sourceCluster, clusterLinkPrefix2 + "linkedTopic");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkSyncAclsSemantics(String quorum, boolean coordinator) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1, false);
        Properties props = this.brokerProps();
        props.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(props, "destLogicalCluster", 11, false);
        KafkaPrincipal alice = new KafkaPrincipal("User", "Alice");
        KafkaPrincipal bob = new KafkaPrincipal("User", "Bob");
        KafkaPrincipal sourceOperator = new KafkaPrincipal("User", "sourceOperator");
        KafkaPrincipal destOperator = new KafkaPrincipal("User", "destOperator");
        ResourcePattern topic1 = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL);
        ResourcePattern topicPrefix = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.PREFIXED);
        ResourcePattern allTopics = new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, this.aclFilters(alice, bob)).all().get();
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        ConfluentAdmin sourceAdmin = this.sourceCluster.admin;
        AclBinding aliceWrite = new AclBinding(topic1, new AccessControlEntry(alice.toString(), "", AclOperation.WRITE, AclPermissionType.ALLOW));
        AclBinding aliceRead = new AclBinding(topicPrefix, new AccessControlEntry(alice.toString(), "", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding bobRead = new AclBinding(topic1, new AccessControlEntry(bob.toString(), "", AclOperation.READ, AclPermissionType.ALLOW));
        AclBinding bobDescribe = new AclBinding(topicPrefix, new AccessControlEntry(bob.toString(), "", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding sourceOperatorAcl = new AclBinding(allTopics, new AccessControlEntry(sourceOperator.toString(), "", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding destOperatorAcl = new AclBinding(allTopics, new AccessControlEntry(destOperator.toString(), "", AclOperation.ALL, AclPermissionType.ALLOW));
        sourceAdmin.createAcls(this.aclBindings(Collections.emptySet(), aliceWrite, aliceRead, bobRead, bobDescribe, sourceOperatorAcl)).all().get(15L, TimeUnit.SECONDS);
        ConfluentAdmin destAdmin = this.destCluster.admin;
        destAdmin.createAcls(this.aclBindings(Collections.emptySet(), aliceWrite, destOperatorAcl)).all().get(15L, TimeUnit.SECONDS);
        HashSet<AclBinding> expectedAcls = new HashSet<AclBinding>();
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), destOperatorAcl));
        expectedAcls.addAll(this.aclBindings(Collections.singleton(linkId), aliceRead, bobRead, bobDescribe));
        expectedAcls.addAll(this.aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), aliceWrite));
        this.waitForDestAcls(expectedAcls);
        this.deleteAcls(sourceAdmin, aliceWrite);
        expectedAcls.removeAll(this.aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), aliceWrite));
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), aliceWrite));
        this.waitForDestAcls(expectedAcls);
        destAdmin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.AclFiltersProp(), this.aclFilters(bob)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        expectedAcls.removeAll(this.aclBindings(Collections.singleton(linkId), aliceRead));
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), aliceRead));
        this.waitForDestAcls(expectedAcls);
        String linkName2 = "tenantLink2";
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, linkName2, this.sourceCluster, 1002, "EXTERNAL", Collections.emptyMap(), true, this.aclFilters(alice, bob)).all().get();
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists(linkName2), true, "Link was not created");
        Uuid linkId2 = this.destCluster.linkId(linkName2);
        expectedAcls.clear();
        expectedAcls.addAll(this.aclBindings(Arrays.asList(linkId, linkId2), bobRead, bobDescribe));
        expectedAcls.addAll(this.aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId2), aliceRead));
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), aliceWrite, destOperatorAcl));
        this.waitForDestAcls(expectedAcls);
        destAdmin.deleteClusterLinks(Collections.singleton("tenantLink2"), new DeleteClusterLinksOptions()).all().get(15L, TimeUnit.SECONDS);
        expectedAcls.clear();
        expectedAcls.addAll(this.aclBindings(Arrays.asList(Uuid.ZERO_UUID, linkId), bobRead, bobDescribe));
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), aliceWrite, aliceRead, destOperatorAcl));
        this.waitForDestAcls(expectedAcls);
        destAdmin.deleteClusterLinks(Collections.singleton("tenantLink"), new DeleteClusterLinksOptions()).all().get(15L, TimeUnit.SECONDS);
        expectedAcls.clear();
        expectedAcls.addAll(this.aclBindings(Collections.emptySet(), aliceWrite, aliceRead, bobRead, bobDescribe, destOperatorAcl));
        this.waitForDestAcls(expectedAcls);
    }

    private String allAclsFilter(Uuid linkId) {
        return "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", \"clusterLinkIds\" : [\"" + Uuid.ZERO_UUID + "\", \"" + linkId + "\"]  }}]}";
    }

    private final String sourceLinkAclsFilter(Uuid linkId) {
        return "{ \"aclFilters\": [{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" },\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", \"clusterLinkIds\" : [\"" + linkId + "\"]  }}]}";
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkAllAclFilters(String quorum, boolean coordinator) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1, false);
        Properties props = this.brokerProps();
        props.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(props, "destLogicalCluster", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        Uuid linkIdInSource = this.addAclsOnSourceWithLinkId(linkId -> this.consumerAcls(this.principal, (Uuid)linkId));
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, this.allAclsFilter(linkIdInSource)).all().get();
        ArrayList preExistingAcls = new ArrayList(this.sourceCluster.linkAcls(this.sourceCluster.linkUser));
        ArrayList<AclBinding> finalAllExpectedAcls = new ArrayList<AclBinding>(this.allExpectedAcls);
        finalAllExpectedAcls.addAll(preExistingAcls);
        this.verifyAclMigration(finalAllExpectedAcls, this.subsetExpectedAcls, "tenantLink");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkLocalAclFilters(String quorum, boolean coordinator) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1, false);
        Properties props = this.brokerProps();
        props.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(props, "destLogicalCluster", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        this.addAclsOnSourceWithLinkId(linkId -> this.consumerAcls(this.principal, (Uuid)linkId));
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, SYNC_ALL_ACL_FILTER).all().get();
        ArrayList preExistingAcls = new ArrayList(this.sourceCluster.linkAcls(this.sourceCluster.linkUser));
        ArrayList<AclBinding> finalAllExpectedAcls = new ArrayList<AclBinding>(this.localExpectedAcls);
        finalAllExpectedAcls.addAll(preExistingAcls);
        this.verifyAclMigration(finalAllExpectedAcls, this.subsetExpectedAcls, "tenantLink");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkSubsetAclFilters(String quorum, boolean coordinator) throws Throwable {
        this.sourceCluster.useSourceInitiatedLink = false;
        this.destCluster.useSourceInitiatedLink = false;
        this.sourceCluster.startCluster(this.brokerProps(), "sourceLogicalCluster", 1, false);
        Properties props = this.brokerProps();
        props.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        this.destCluster.startCluster(props, "destLogicalCluster", 11, false);
        KafkaTestUtils.addProducerAcls(this.sourceCluster.admin, this.principal, APP1_TOPIC, PatternType.LITERAL);
        this.addAclsOnSourceWithLinkId(linkId -> this.consumerAcls(this.principal, (Uuid)linkId));
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.emptyMap(), true, this.subsetAclsFilter).all().get();
        this.verifyAclMigration(this.subsetExpectedAcls, this.subsetExpectedAcls, "tenantLink");
    }

    private void verifyAclMigration(Collection<AclBinding> expectedAcls, Collection<AclBinding> subsetExpectedAcls, String linkName) throws Throwable {
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists(linkName), true, "Link was not created");
        Uuid linkId = this.destCluster.linkId(linkName);
        this.waitForDestAcls(this.aclBindings(Collections.singleton(linkId), expectedAcls.toArray(new AclBinding[0])));
        this.deleteProducerAcls(this.sourceCluster.admin, subsetExpectedAcls);
        ArrayList<AclBinding> newAclList = new ArrayList<AclBinding>(expectedAcls);
        newAclList.removeAll(subsetExpectedAcls);
        this.waitForDestAcls(this.aclBindings(Collections.singleton(linkId), newAclList.toArray(new AclBinding[0])));
        this.verifyAclMetrics(linkName);
    }

    private void verifyAclMetrics(String linkName) throws Exception {
        Metrics metrics = this.destCluster.isKraft() ? ((KafkaBroker)this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().stream().filter(b -> b.clusterLinkManager().clusterLinkMetadataManager().isDefined() && ((ClusterLinkMetadataManager)b.clusterLinkManager().clusterLinkMetadataManager().get()).isLinkCoordinator(linkName)).collect(Collectors.toList()).get(0)).metrics() : this.destCluster.physicalCluster.kafkaCluster().controllerBrokerServer().metrics();
        Map<MetricName, KafkaMetric> aclMetrics = metrics.metrics().entrySet().stream().filter(entry -> this.isAclMetric((MetricName)entry.getKey(), linkName)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Assertions.assertTrue((aclMetrics.size() > 0 ? 1 : 0) != 0);
        for (Map.Entry<MetricName, KafkaMetric> entry2 : aclMetrics.entrySet()) {
            TestUtils.waitForCondition(() -> (Double)((KafkaMetric)entry2.getValue()).metricValue() > 0.0, (String)("Metric not updated: " + entry2.getKey().name()));
        }
    }

    private boolean isAclMetric(MetricName metricName, String linkName) {
        return (metricName.name().equals("acls-deleted-rate") || metricName.name().equals("acls-deleted-total") || metricName.name().equals("acls-added-rate") || metricName.name().equals("acls-added-total")) && metricName.group().equals("cluster-link-metrics") && Objects.equals(metricName.tags().get("link-name"), linkName);
    }

    private void deleteProducerAcls(ConfluentAdmin adminClient, Collection<AclBinding> subsetExpectedAcls) {
        Collection aclsToDelete = subsetExpectedAcls.stream().map(AclBinding::toFilter).collect(Collectors.toCollection(HashSet::new));
        adminClient.deleteAcls(aclsToDelete);
    }

    private Uuid addAclsOnSourceWithLinkId(Function<Uuid, List<AclBinding>> aclBindings) throws Throwable {
        String sourceLinkName = "linkInSource";
        this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, sourceLinkName, this.destCluster, 1001, "EXTERNAL", Collections.emptyMap(), false, false);
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists(sourceLinkName), true, "Link was not created");
        Uuid linkIdInSource = this.sourceCluster.linkId(sourceLinkName);
        this.destCluster.deleteLinkAcls(this.destCluster.linkUser);
        this.sourceCluster.admin.createAcls((Collection)aclBindings.apply(linkIdInSource)).all().get(15L, TimeUnit.SECONDS);
        return linkIdInSource;
    }

    private List<AclBinding> consumerAcls(KafkaPrincipal principal, Uuid linkId) {
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, APP1_TOPIC, PatternType.LITERAL), new AccessControlEntry(principal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW, Collections.singleton(linkId)));
        AclBinding consumerGroupAclWithoutLinkId = new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding consumerGroupAclWithLinkId = new AclBinding(new ResourcePattern(ResourceType.GROUP, APP1_CONSUMER_GROUP, PatternType.LITERAL), new AccessControlEntry(principal.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW, Collections.singleton(linkId)));
        return Arrays.asList(topicAcl, consumerGroupAclWithoutLinkId, consumerGroupAclWithLinkId);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkWithClusterLinkPrefix(String quorum, boolean coordinator) throws Throwable {
        Uuid linkId = this.testBasicEndToEndClusterLinking("src_", false, true, true);
        this.verifyLinkDescription(linkId);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        this.verifyTenantConnectionMetrics(false, false);
    }

    private Uuid testBasicEndToEndClusterLinking(String clusterLinkPrefix, Boolean enableSyncAcl, Boolean testAddSourcePartitions, Boolean testChangeSourceConfig) throws Throwable {
        this.setUpClusters(false, false, false);
        Map<String, String> destClusterConfigs = clusterLinkPrefix.isEmpty() ? Collections.emptyMap() : Collections.singletonMap(ClusterLinkConfig.ClusterLinkPrefixProp(), clusterLinkPrefix);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, destClusterConfigs, enableSyncAcl);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        Uuid linkId = this.destCluster.linkId("tenantLink");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, clusterLinkPrefix);
        this.verifyTopicListing(clusterLinkPrefix, this.destCluster);
        this.verifyTopicMirroring(clusterLinkPrefix, this.sourceCluster, this.destCluster);
        if (enableSyncAcl.booleanValue()) {
            this.verifyAclAndOffsetMigration(clusterLinkPrefix);
        }
        if (testAddSourcePartitions.booleanValue()) {
            this.addSourcePartitionsAndVerifyMirror(4, clusterLinkPrefix);
        }
        if (testChangeSourceConfig.booleanValue()) {
            this.changeSourceTopicConfigAndVerifyMirror(clusterLinkPrefix);
        }
        this.verifyTopicMirroring(clusterLinkPrefix, this.sourceCluster, this.destCluster);
        this.verifyTenantMetrics(linkId, 1001, clusterLinkPrefix, false);
        this.verifyTenantConnectionMetrics(false, true);
        this.verifyMetricsGroups(linkId);
        Assertions.assertEquals((Object)"linkedTopic", (Object)this.destCluster.mirrorDescription(clusterLinkPrefix + "linkedTopic").sourceTopic());
        this.stopMirroring(this.destCluster, clusterLinkPrefix + "linkedTopic");
        return linkId;
    }

    private void verifyLinkDescription(Uuid linkId) throws Exception {
        ClusterLinkDescription link = (ClusterLinkDescription)((Collection)this.destCluster.admin.describeClusterLinks(new DescribeClusterLinksOptions().linkNames(Collections.singleton("tenantLink"))).result().get(15L, TimeUnit.SECONDS)).iterator().next();
        Assertions.assertEquals((Object)linkId, (Object)link.clusterLinkId());
        Assertions.assertEquals((Object)"tenantLink", (Object)link.linkName());
        Assertions.assertEquals((Object)ClusterLinkDescription.LinkMode.DESTINATION, (Object)link.linkMode());
        Assertions.assertEquals((Object)ClusterLinkDescription.ConnectionMode.OUTBOUND, (Object)link.connectionMode());
        Assertions.assertEquals((Object)"sourceLogicalCluster", (Object)link.remoteClusterId());
        Assertions.assertEquals((Object)"destLogicalCluster", (Object)link.localClusterId());
        Assertions.assertEquals((Object)ClusterLinkError.NO_ERROR, (Object)link.clusterLinkError());
        String coordinator = "EXTERNAL://" + link.linkCoordinatorHost() + ":" + link.linkCoordinatorPort();
        List<KafkaBroker> brokers = this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers();
        boolean isExternalEndpoint = brokers.stream().anyMatch(b -> b.advertisedListeners().exists(e -> e.connectionString().equals(coordinator)));
        Assertions.assertTrue((boolean)isExternalEndpoint, (String)("Invalid coordinator " + coordinator + ", advertisedListeners=" + brokers.stream().map(KafkaBroker::advertisedListeners).collect(Collectors.toList())));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMultiTenantClusterLinkNonUpdatableConfigPolicyViolationTest(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        Properties sourceTopicProps = new Properties();
        sourceTopicProps.setProperty("retention.ms", "60000");
        this.sourceCluster.physicalCluster.kafkaCluster().kafkas().get(0).createTopic(new ListenerName("INTERNAL"), this.sourceCluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1, sourceTopicProps);
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        final ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        HashMap<ConfigResource, Collection<AlterConfigOp>> alterConfigResource = new HashMap<ConfigResource, Collection<AlterConfigOp>>(){
            {
                this.put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaFetchMaxBytesProp(), "5242882"), AlterConfigOp.OpType.SET)));
            }
        };
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs((Map)alterConfigResource);
        TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), PolicyViolationException.class);
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules.AlwaysConfigs().mkString(",") + "," + "flush.ms", false));
        Assertions.assertEquals(PolicyViolationException.class, e.getCause().getClass());
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp(), MirrorTopicConfigSyncRules.AlwaysConfigs().mkString(",") + "," + "retention.ms", false);
        TestUtils.waitForCondition(() -> {
            String value = this.destCluster.linkConfig("tenantLink", ClusterLinkConfig.TopicConfigSyncIncludeProp());
            return value != null && value.contains("retention.ms");
        }, (String)(ClusterLinkConfig.TopicConfigSyncIncludeProp() + " not updated"));
        TestUtils.waitForCondition(() -> "60000".equals(this.destCluster.topicConfig("linkedTopic", "retention.ms")), (String)"Retention not sync'ed to mirror topic");
        Assertions.assertEquals((Object)String.valueOf(Long.MAX_VALUE), (Object)this.destCluster.topicConfig("linkedTopic", "flush.ms"));
        this.verifyTopicMirroring();
        this.stopMirroring(this.destCluster, "linkedTopic");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testClusterLinkSaslConfigs(String quorum, boolean coordinator) throws Throwable {
        HashMap<String, String> destOverrides = new HashMap<String, String>();
        destOverrides.put("confluent.plugins.link.policy.sasl.login.module.allowed", OAuthBearerLoginModule.class.getName() + "," + PlainLoginModule.class.getName() + "," + ScramLoginModule.class.getName() + "," + DummyOAuthLoginModule.class.getName());
        this.setUpClusters(false, false, false, Collections.emptyMap(), destOverrides);
        HashMap<String, String> securityConfigs = new HashMap<String, String>();
        securityConfigs.put("security.protocol", "SASL_PLAINTEXT");
        securityConfigs.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap(), false);
        int linkIndex = 2;
        securityConfigs.put("sasl.mechanism", "GSSAPI");
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, PolicyViolationException.class, "sasl.mechanism");
        this.verifyLinkUpdateFailure(securityConfigs, PolicyViolationException.class, "sasl.mechanism");
        securityConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        securityConfigs.put("sasl.mechanism", "PLAIN");
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        securityConfigs.put("sasl.client.callback.handler.class", SaslClientCallbackHandler.class.getName());
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        this.verifyLinkUpdateFailure(securityConfigs, PolicyViolationException.class, "sasl.client.callback.handler.class");
        securityConfigs.remove("sasl.client.callback.handler.class");
        securityConfigs.put("sasl.login.callback.handler.class", DynamicPlainLoginCallbackHandler.class.getName());
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, PolicyViolationException.class, "sasl.login.callback.handler.class");
        this.verifyLinkUpdateFailure(securityConfigs, PolicyViolationException.class, "sasl.login.callback.handler.class");
        securityConfigs.put("sasl.login.callback.handler.class", "non.existent.class");
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, InvalidConfigurationException.class, "sasl.login.callback.handler.class");
        this.verifyLinkUpdateFailure(securityConfigs, this.destCluster.isKraft() ? InvalidConfigurationException.class : PolicyViolationException.class, "sasl.login.callback.handler.class");
        securityConfigs.put("sasl.mechanism", "OAUTHBEARER");
        securityConfigs.put("sasl.jaas.config", DummyOAuthLoginModule.class.getName() + " required clientId=\"id\" clientSecret=\"secret\";");
        securityConfigs.put("sasl.login.callback.handler.class", OAuthBearerLoginCallbackHandler.class.getName());
        String token = new OAuthUtils.Builder(36000, "Confluent", "Test", null).userIds(new Integer[]{1234}).withKid(true).build().userTokens().get(1234);
        securityConfigs.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:12345");
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        securityConfigs.put("sasl.oauthbearer.token.endpoint.url", "https://localhost:12345");
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        securityConfigs.put("sasl.oauthbearer.scope.claim.name", "myscope");
        securityConfigs.put("sasl.oauthbearer.sub.claim.name", "mysub");
        this.verifyValidLinkSecurityConfigs(linkIndex++, securityConfigs);
        securityConfigs.put("sasl.oauthbearer.token.endpoint.url", TestUtils.tempFile((String)token).toURI().toString());
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, PolicyViolationException.class, "unsupported protocol 'file'");
        this.verifyLinkUpdateFailure(securityConfigs, PolicyViolationException.class, "unsupported protocol 'file'");
        securityConfigs.remove("sasl.oauthbearer.token.endpoint.url");
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, InvalidConfigurationException.class, "sasl.oauthbearer.token.endpoint.url");
        securityConfigs.put("sasl.login.callback.handler.class", "non.existent.class");
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, InvalidConfigurationException.class, "sasl.login.callback.handler.class");
        this.verifyLinkUpdateFailure(securityConfigs, this.destCluster.isKraft() ? InvalidConfigurationException.class : PolicyViolationException.class, "sasl.login.callback.handler.class");
        securityConfigs.put("sasl.login.callback.handler.class", DynamicPlainLoginCallbackHandler.class.getName());
        this.verifyInvalidLinkSecurityConfigs(linkIndex++, securityConfigs, PolicyViolationException.class, "sasl.login.callback.handler.class");
        this.verifyLinkUpdateFailure(securityConfigs, PolicyViolationException.class, "sasl.login.callback.handler.class");
        securityConfigs.put("sasl.jaas.config", OAuthBearerLoginModule.class.getName() + " required clientId=\"id\" clientSecret=\"secret\";");
        securityConfigs.remove("sasl.login.callback.handler.class");
        this.verifyInvalidLinkSecurityConfigs(linkIndex, securityConfigs, InvalidConfigurationException.class, "login failure");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    private void verifyValidLinkSecurityConfigs(int linkIndex, Map<String, String> securityConfigs) throws Throwable {
        CreateClusterLinksResult result = this.createLink(linkIndex, securityConfigs);
        String linkName = (String)result.result().keySet().iterator().next();
        result.all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists(linkName), true, "Link was not created");
        this.destCluster.deleteClusterLink(this.destCluster.admin, linkName);
    }

    private CreateClusterLinksResult createLink(int linkIndex, Map<String, String> propOverrides) throws Throwable {
        String linkName = "link" + linkIndex;
        return this.destCluster.createDestClusterLinkResult(this.destCluster.admin, linkName, this.sourceCluster, 1000 + linkIndex, "INTERNAL", propOverrides, false);
    }

    private void verifyInvalidLinkSecurityConfigs(int linkIndex, Map<String, String> invalidConfigs, Class<? extends Throwable> exceptionClass, String reason) throws Throwable {
        if (!this.destCluster.isKraft()) {
            Throwable t = TestUtils.assertFutureThrows((Future)this.createLink(linkIndex, invalidConfigs).all(), exceptionClass);
            Assertions.assertTrue((boolean)t.getMessage().contains(reason), (String)("Unexpected failure: " + t.getMessage()));
        }
    }

    private void verifyLinkUpdateFailure(Map<String, String> invalidConfigs, Class<? extends Throwable> cause, String reason) throws Throwable {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        HashMap newConfigs = new HashMap();
        List ops = invalidConfigs.entrySet().stream().map(e -> new AlterConfigOp(new ConfigEntry((String)e.getKey(), (String)e.getValue()), AlterConfigOp.OpType.SET)).collect(Collectors.toList());
        newConfigs.put(configResource, ops);
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs(newConfigs);
        Throwable t = TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), cause);
        Assertions.assertTrue((boolean)t.getMessage().contains(reason), (String)("Unexpected failure: " + t.getMessage()));
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testClusterLinkSecurityUpdate(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.singletonMap(ClusterLinkConfig.MirrorStartOffsetSpecProp(), "latest"), true);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        KafkaTestUtils.sendRecords(this.sourceCluster.getOrCreateProducer(), "linkedTopic", 0, 10);
        this.createMirrorTopic(this.destCluster.admin, "", new NewMirrorTopic("tenantLink", "", OffsetSpec.latest()));
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration("");
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration("");
        this.verifySocketBufferSizeUpdate();
        if (!TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            this.addSourcePartitionsAndVerifyMirror(4, "");
        }
        this.changeSourceTopicConfigAndVerifyMirror("");
        this.verifyTopicMirroring();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testSourceInitiatedLink(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(true, false, true);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003);
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing();
        this.verifyTopicMirroring();
        this.verifyMetricsGroups(this.destCluster.linkId("tenantLink"));
        this.verifyTenantMetrics(this.destCluster.linkId("tenantLink"), 1003, "", true);
        this.verifyTenantConnectionMetrics(true, true);
        this.verifyAclAndOffsetMigration("");
        LogicalClusterUser newUser = this.sourceCluster.createLinkUser(2001);
        String newJaasConfig = newUser.saslJaasConfig();
        this.sourceCluster.alterClusterLink(this.sourceCluster.admin, "tenantLink", ClusterLinkConfig.LocalPrefix() + "sasl.jaas.config", newJaasConfig);
        this.sourceCluster.deleteUser(this.sourceCluster.linkUser, true);
        this.sourceCluster.linkUser = newUser;
        this.verifyTopicMirroring();
        this.verifyAclAndOffsetMigration("");
        this.verifySocketBufferSizeUpdate();
        this.addSourcePartitionsAndVerifyMirror(4, "");
        this.changeSourceTopicConfigAndVerifyMirror("");
        this.verifyTopicMirroring();
        this.stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        this.verifyTenantConnectionMetrics(true, false);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testSourceInitiatedLinkMapsLocalReverseConnectionListener(String quorum, boolean coordinator) throws Throwable {
        HashMap<String, String> sourceConfigOverrides = new HashMap<String, String>();
        String listenersOverride = "FOO://localhost:0,BAR://localhost:0,INTERNAL://localhost:0,EXTERNAL://localhost:0";
        sourceConfigOverrides.put(KafkaConfig.ListenersProp(), listenersOverride);
        String localReverseConnectionListenerMapOverride = "FOO:BAR,BAR:BAR,EXTERNAL:EXTERNAL,INTERNAL:INTERNAL";
        sourceConfigOverrides.put(KafkaConfig.ClusterLinkLocalReverseConnectionListenerMapProp(), localReverseConnectionListenerMapOverride);
        String securityProtocolMapOverride = "FOO:SASL_PLAINTEXT,BAR:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT";
        sourceConfigOverrides.put(KafkaConfig.ListenerSecurityProtocolMapProp(), securityProtocolMapOverride);
        sourceConfigOverrides.put("listener.name.foo.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        sourceConfigOverrides.put("listener.name.foo.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        sourceConfigOverrides.put("listener.name.bar.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        sourceConfigOverrides.put("listener.name.bar.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        sourceConfigOverrides.put("listener.name.baz.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        sourceConfigOverrides.put("listener.name.baz.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            sourceConfigOverrides.put("listener.name.foo.scram-sha-256.sasl.server.callback.handler.class", PhysicalCluster.KRaftScramIntegrationTestCallbackHandler.class.getName());
            sourceConfigOverrides.put("listener.name.bar.scram-sha-256.sasl.server.callback.handler.class", PhysicalCluster.KRaftScramIntegrationTestCallbackHandler.class.getName());
            sourceConfigOverrides.put("listener.name.baz.scram-sha-256.sasl.server.callback.handler.class", PhysicalCluster.KRaftScramIntegrationTestCallbackHandler.class.getName());
        }
        this.setupSourceCluster(true, false, true, sourceConfigOverrides, Collections.emptyMap(), Optional.of("EXTERNAL"));
        this.setupDestCluster(true, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        ConfluentAdmin fooListenerAdmin = (ConfluentAdmin)KafkaTestUtils.createAdminClient(this.sourceCluster.physicalCluster.bootstrapServers("FOO"), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.sourceCluster.logicalCluster.adminUser().saslJaasConfig());
        this.sourceCluster.createSourceClusterLinkResult(fooListenerAdmin, "tenantLink", this.destCluster, 1004, Collections.emptyMap(), "EXTERNAL").all().get();
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists("tenantLink"), true, "Link was not created");
        String localListener = this.sourceCluster.linkConfig("tenantLink", ClusterLinkConfig.LocalListenerNameProp());
        Assertions.assertEquals((Object)"BAR", (Object)localListener);
        this.createSourceTopic();
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicListing();
        this.verifyTopicMirroring(Optional.of("EXTERNAL"));
        this.stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testSourceInitiatedLinkLocalConfigs(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(true, true, true);
        ConfluentAdmin sourceInternalAdmin = this.sourceCluster.admin;
        this.sourceCluster.admin = (ConfluentAdmin)this.sourceCluster.createAdminClient(this.sourceCluster.logicalCluster.adminUser());
        this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, -1, Collections.emptyMap(), true);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        Map<String, String> configOverrides = Collections.singletonMap("local.bootstrap.servers", "somehost:9072");
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1003, configOverrides);
        MultiTenantClusterLinkTest.waitFor(() -> this.sourceCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.sourceCluster.admin.createTopics(Collections.singleton(new NewTopic("linkedTopic", Optional.empty(), Optional.of((short)1))));
        this.createMirrorTopic(this.destCluster.admin, "");
        this.verifyTopicMirroring();
        Map<ConfigResource, List<AlterConfigOp>> alterOps = Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Arrays.asList(new AlterConfigOp(new ConfigEntry("local.bootstrap.servers", "somehost:9071"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("local.security.protocol", "PLAINTEXT"), AlterConfigOp.OpType.SET)));
        this.sourceCluster.admin.incrementalAlterConfigs(alterOps).all().get(15L, TimeUnit.SECONDS);
        this.verifyTopicMirroring();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, this.sourceCluster.linkUser.tenantPrefix() + "tenantLink");
        alterOps = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "true"), AlterConfigOp.OpType.SET)));
        sourceInternalAdmin.incrementalAlterConfigs(alterOps).all().get();
        alterOps = Collections.singletonMap(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry(ClusterLinkConfig.ClusterLinkPausedProp(), "false"), AlterConfigOp.OpType.SET)));
        sourceInternalAdmin.incrementalAlterConfigs(alterOps).all().get();
        this.verifyTopicMirroring();
        this.stopMirroring(this.destCluster, "linkedTopic");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "tenantLink");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMaxClusterLink(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        KafkaPrincipal brokerPrincipal = new KafkaPrincipal("User", "broker");
        this.destCluster.addClusterAcls(brokerPrincipal, "All");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap(), false);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, 1003, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, 1004, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, 1005, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, 1006, Collections.emptyMap(), false);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link7", this.sourceCluster, 1007, Collections.emptyMap(), false);
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.DESTINATION, "link8", 1008, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link9", 1009, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        Map<String, String> bidirectionalOverrides = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link7");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link10", this.sourceCluster, 1010, bidirectionalOverrides, false);
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.DESTINATION, "link11", 1011, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link12", 1012, ClusterLinkDescription.LinkMode.DESTINATION, 5);
        if (!TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            this.changeMaxClusterLinks(6, -1);
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link21", this.sourceCluster, 1021, "EXTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
            this.destCluster.deleteClusterLink(this.destCluster.admin, "link10");
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "link22", this.sourceCluster, 1022, "EXTERNAL", bidirectionalOverrides, true).all().get(30L, TimeUnit.SECONDS);
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testTenantAclSyncLimit(String quorum, boolean coordinator) throws Throwable {
        int destAclLimit = 30;
        this.setUpClusters(false, false, true, destAclLimit);
        int initialAcls = 4;
        this.waitForAcls(this.sourceCluster, initialAcls, null);
        this.waitForAcls(this.destCluster, initialAcls, null);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, 1001, Collections.emptyMap());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, 1002, Collections.emptyMap());
        Uuid linkId1 = this.destCluster.linkId("link1");
        Uuid linkId2 = this.destCluster.linkId("link2");
        int sourceAcls = 14;
        this.waitForAcls(this.sourceCluster, sourceAcls, null);
        this.waitForAcls(this.destCluster, sourceAcls + initialAcls, null);
        this.waitForAcls(this.destCluster, sourceAcls, linkId1);
        this.waitForAcls(this.destCluster, sourceAcls, linkId2);
        TestUtils.waitForCondition(() -> this.aclAddFailedMetric() == 0.0, (String)("ACL update failed before reaching limit " + this.aclAddFailedMetric()));
        while (sourceAcls + initialAcls < destAclLimit - 2) {
            this.addSourceAcl("topic" + sourceAcls++);
            this.waitForAcls(this.sourceCluster, sourceAcls, null);
            this.waitForAcls(this.destCluster, sourceAcls + initialAcls, null);
            this.waitForAcls(this.destCluster, sourceAcls, linkId1);
            this.waitForAcls(this.destCluster, sourceAcls, linkId2);
        }
        for (int i = 0; i < 5; ++i) {
            this.addSourceAcl("topic" + sourceAcls++);
        }
        this.waitForAcls(this.sourceCluster, sourceAcls, null);
        TestUtils.waitForCondition(() -> this.aclAddFailedMetric() > 0.0, (String)"ACL failed metric not updated");
        Assertions.assertTrue((this.aclCount(this.destCluster, null) <= destAclLimit ? 1 : 0) != 0, (String)("Too many dest ACLs " + this.aclCount(this.destCluster, null)));
        int totalAcls = this.aclCount(this.destCluster, null);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link1");
        TestUtils.waitForCondition(() -> this.aclCount(this.destCluster, linkId1) == 0, (String)"Link id not removed on cluster link delete");
        this.destCluster.deleteClusterLink(this.destCluster.admin, "link2");
        TestUtils.waitForCondition(() -> this.aclCount(this.destCluster, linkId2) == 0, (String)"Link id not removed on cluster link delete");
        TestUtils.waitForCondition(() -> this.aclCount(this.destCluster, null) == totalAcls, (String)"ACLs not present after cluster link delete");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMaxSourceInitiatedClusterLink(String quorum, boolean coordinator) throws Throwable {
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            HashMap<String, String> overrides = new HashMap<String, String>();
            overrides.put("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", "10");
            overrides.put("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", "2");
            this.setUpClusters(true, false, true, overrides, overrides);
        } else {
            this.setUpClusters(true, false, true);
            this.changeMaxClusterLinks(10, 2);
        }
        KafkaPrincipal brokerPrincipal = new KafkaPrincipal("User", "broker");
        this.destCluster.addClusterAcls(brokerPrincipal, "All");
        this.sourceCluster.addClusterAcls(brokerPrincipal, "All");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link1", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link1", this.destCluster, 2001);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link2", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link2", this.destCluster, 2002);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link1");
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link2");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link3", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link3", this.destCluster, 2003);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link4", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link4", this.destCluster, 2004);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link5", this.sourceCluster, -1, Collections.emptyMap());
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.SOURCE, "link5", 2005, ClusterLinkDescription.LinkMode.SOURCE, 2);
        Map<String, String> bidirectionalOverrides = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), ClusterLinkDescription.LinkMode.BIDIRECTIONAL.name());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link6", this.sourceCluster, -1, bidirectionalOverrides);
        this.verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode.BIDIRECTIONAL, "link6", 2006, ClusterLinkDescription.LinkMode.SOURCE, 2);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link4");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link7", this.sourceCluster, -1, bidirectionalOverrides);
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link7", this.destCluster, 2007, bidirectionalOverrides).all().get(15L, TimeUnit.SECONDS);
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            return;
        }
        this.changeMaxClusterLinks(-1, 3);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link21", this.sourceCluster, -1, Collections.emptyMap());
        this.sourceCluster.createSourceClusterLink(this.sourceCluster.admin, "link21", this.destCluster, 2021);
        this.sourceCluster.deleteClusterLink(this.sourceCluster.admin, "link3");
        this.destCluster.createDestClusterLink(this.destCluster.admin, "link22", this.sourceCluster, -1, bidirectionalOverrides);
        this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, "link22", this.destCluster, 2022, bidirectionalOverrides).all().get(15L, TimeUnit.SECONDS);
    }

    private void setUpClustersForMtlsUsingDestBrokerProps() throws Exception {
        boolean useSourceInitiatedLink = false;
        boolean useSslForSourceListener = true;
        this.setupSourceCluster(useSourceInitiatedLink, useSslForSourceListener, false, Collections.emptyMap(), Optional.empty());
        String destNetworkId = "network-" + UUID.randomUUID().toString();
        HashMap<String, String> destBrokerOverrides = new HashMap<String, String>();
        destBrokerOverrides.put("confluent.ccloud.host.suffixes", "localhost");
        destBrokerOverrides.put("confluent.max.acls.per.tenant", String.valueOf(1000));
        destBrokerOverrides.put("confluent.traffic.network.id", destNetworkId);
        Map internalListenerProps = this.sourceCluster.clientConfigs("INTERNAL");
        ClusterLinkCCloudToCCloudChannelBuilder.MTLS_CONFIGS_TO_OVERRIDE().forEach(config -> {
            if (internalListenerProps.containsKey(config)) {
                destBrokerOverrides.put((String)config, (String)internalListenerProps.get(config));
            }
        });
        this.setupDestCluster(useSourceInitiatedLink, destBrokerOverrides);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testMtlsClusterLinkUsingDestBrokerProps(String quorum, boolean coordinator) throws Throwable {
        Set allowedSecurityProtocols = (Set)TestUtils.fieldValue(null, ClusterLinkPolicyConfig.class, (String)"ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS");
        HashSet origAllowedSecurityProtocols = new HashSet(allowedSecurityProtocols);
        try {
            allowedSecurityProtocols.add(SecurityProtocol.SSL.name());
            this.setUpClustersForMtlsUsingDestBrokerProps();
            this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", false, Collections.emptyMap(), true, SYNC_ALL_ACL_FILTER, null).all().get(15L, TimeUnit.SECONDS);
            MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
            this.destCluster.linkId("tenantLink");
            NewTopic sourceTopic = new NewTopic("linkedTopic", Optional.empty(), Optional.of((short)1));
            this.sourceCluster.admin.createTopics(Collections.singleton(sourceTopic));
            MultiTenantClusterLinkTest.waitFor(() -> {
                try {
                    return ((Map)this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic");
                }
                catch (Exception e) {
                    return false;
                }
            }, true, "Failed to list topic");
            this.createMirrorTopic(this.destCluster.admin, "");
            int messageCount = 10;
            Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, "", "");
            producerProps.putAll((Map<?, ?>)this.sourceCluster.sslConfigs);
            try (KafkaProducer sourceProducer = new KafkaProducer(producerProps);){
                KafkaTestUtils.sendRecords((KafkaProducer<String, String>)sourceProducer, "linkedTopic", 0, messageCount);
                KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, messageCount);
            }
        }
        finally {
            allowedSecurityProtocols.clear();
            allowedSecurityProtocols.addAll(origAllowedSecurityProtocols);
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testSslClusterLinkExplicitLinkProps(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, true, false);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.destCluster.linkId("tenantLink");
        NewTopic sourceTopic = new NewTopic("linkedTopic", Optional.empty(), Optional.of((short)1));
        this.sourceCluster.admin.createTopics(Collections.singleton(sourceTopic));
        MultiTenantClusterLinkTest.waitFor(() -> {
            try {
                return ((Map)this.sourceCluster.admin.listTopics().namesToListings().get()).containsKey("linkedTopic");
            }
            catch (Exception e) {
                return false;
            }
        }, true, "Failed to list topic");
        this.createMirrorTopic(this.destCluster.admin, "");
        int messageCount = 10;
        Properties producerProps = KafkaTestUtils.producerProps(this.sourceCluster.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, "", "");
        producerProps.putAll((Map<?, ?>)this.sourceCluster.sslConfigs);
        try (KafkaProducer sourceProducer = new KafkaProducer(producerProps);){
            KafkaTestUtils.sendRecords((KafkaProducer<String, String>)sourceProducer, "linkedTopic", 0, messageCount);
            KafkaTestUtils.consumeRecords(this.destCluster.getOrCreateConsumer("destGroup"), "linkedTopic", 0, messageCount);
        }
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testValidateSecurityProtocol(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        HashMap<String, String> overrides = new HashMap<String, String>();
        overrides.put("bootstrap.servers", "test.confluent.cloud:9071");
        overrides.put("security.protocol", SecurityProtocol.PLAINTEXT.name);
        CreateClusterLinksResult createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", overrides, false);
        String errMsg = "Invalid security protocol PLAINTEXT for a Confluent Cloud to Confluent Cloud link, it must be SASL_SSL.";
        PolicyViolationException ex = (PolicyViolationException)TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class);
        Assertions.assertTrue((boolean)ex.getMessage().contains(errMsg), (String)ex.getMessage());
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, Collections.emptyMap(), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        ArrayList alterConfigs = Lists.newArrayList((Object[])new AlterConfigOp[]{new AlterConfigOp(new ConfigEntry("bootstrap.servers", "test.confluent.cloud:9071"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("security.protocol", SecurityProtocol.PLAINTEXT.name), AlterConfigOp.OpType.SET)});
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), alterConfigs));
        ex = (PolicyViolationException)TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), PolicyViolationException.class);
        Assertions.assertTrue((boolean)ex.getMessage().contains(errMsg), (String)ex.getMessage());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testInternalIpAndPort(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        CreateClusterLinksResult createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "10.0.0.3:9071"), false);
        String errMsgFmt = "Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [%s/%s:9071]";
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "", "10.0.0.3"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1002, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "127.0.0.1:9071"), false);
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "", "127.0.0.1"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1003, "EXTERNAL", Collections.singletonMap("bootstrap.servers", "localhost:9071"), false);
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)String.format(errMsgFmt, "localhost", "127.0.0.1"));
        createClusterLinksResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1004, "EXTERNAL", Collections.singletonMap("bootstrap.servers", " 11.0.0.3:9071 , 10.0.0.3:9071,10.0.0.4:9071"), false);
        String errMsgTwo = "Unable to validate cluster link due to error: Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071, /10.0.0.4:9071]";
        TestUtils.assertFutureThrows((Future)createClusterLinksResult.all(), PolicyViolationException.class, (String)errMsgTwo);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1005, Collections.emptyMap(), false);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        AlterConfigsResult alterConfigsResult = this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry("bootstrap.servers", "10.0.0.3:9071"), AlterConfigOp.OpType.SET))));
        TestUtils.assertFutureThrows((Future)alterConfigsResult.all(), PolicyViolationException.class, (String)"Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: [/10.0.0.3:9071]");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testCLRequestOnUnauthenticatedListener(String quorum, boolean coordinator) throws Throwable {
        this.setUpClusters(false, false, true);
        this.destCluster.createDestClusterLinkResult(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, "INTERNAL", Collections.emptyMap(), true).all().get(15L, TimeUnit.SECONDS);
        this.destCluster.setInternalClusterLinkConfigs("tenantLink", Collections.singletonMap(ClusterLinkConfig.AvailabilityCheckMsProp(), "1000"));
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkIdExists("tenantLink"), true, "Link was not created");
        this.createSourceTopic();
        Assertions.assertThrows(ExecutionException.class, () -> this.createMirrorTopic(this.destCluster.admin, ""));
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkState("tenantLink"), ClusterLinkDescription.LinkState.FAILED, "Link didn't transition to failed state");
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("link-name", "tenantLink");
        tags.put("tenant", this.destCluster.logicalCluster.logicalClusterId());
        tags.put("reason", "unauthorized_bootstrap");
        Assertions.assertEquals((double)this.metricValue(this.destCluster, "cluster-link-metrics", "broker-failed-link-count", tags, false), (double)1.0);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}.coordinator={1}")
    @MethodSource(value={"allCombinations"})
    public void testAclsOnAclFilterUpdate(String quorum, boolean coordinator) throws Throwable {
        int destAclLimit = 13;
        this.setUpClusters(false, false, true, destAclLimit);
        int initialAcls = 4;
        this.waitForAcls(this.sourceCluster, initialAcls, null);
        this.waitForAcls(this.destCluster, initialAcls, null);
        this.destCluster.createDestClusterLink(this.destCluster.admin, "tenantLink", this.sourceCluster, 1001, Collections.emptyMap());
        int sourceAcls = 9;
        this.waitForAcls(this.sourceCluster, sourceAcls, null);
        this.waitForAcls(this.destCluster, initialAcls + sourceAcls, null);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.AclFiltersProp(), this.subsetAclsFilter);
        this.waitForAcls(this.destCluster, initialAcls + sourceAcls, null);
        MultiTenantClusterLinkTest.waitFor(() -> this.currentAcls(this.destCluster.admin).stream().filter(aclBinding -> !aclBinding.entry().clusterLinkIds().isEmpty()).collect(Collectors.toSet()), Collections.emptySet(), "Destination ACLs still have link id stamped");
        this.waitForAcls(this.destCluster, initialAcls + sourceAcls, Uuid.ZERO_UUID);
        this.destCluster.alterClusterLink(this.destCluster.admin, "tenantLink", ClusterLinkConfig.AclFiltersProp(), SYNC_ALL_ACL_FILTER);
        Uuid linkId = this.destCluster.linkId("tenantLink");
        this.waitForAcls(this.destCluster, sourceAcls, linkId);
        this.waitForAcls(this.destCluster, initialAcls + sourceAcls, Uuid.ZERO_UUID);
        this.setAclLimit(this.destCluster, 50);
        this.addSourceAcl("test.topic.1");
        this.addSourceAcl("test.topic.2");
        this.waitForAcls(this.destCluster, sourceAcls + 2, linkId);
        this.waitForAcls(this.destCluster, initialAcls + sourceAcls, Uuid.ZERO_UUID);
        this.setAclLimit(this.destCluster, destAclLimit);
        this.destCluster.deleteClusterLink(this.destCluster.admin, "tenantLink");
        TestUtils.waitForCondition(() -> this.aclCount(this.destCluster, linkId) == 0, (String)"Link id not removed on cluster link delete");
        this.waitForAcls(this.destCluster, destAclLimit + 2, Uuid.ZERO_UUID);
    }

    private void setAclLimit(MultiTenantCluster cluster, int limit) throws Exception {
        for (KafkaBroker broker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            TestUtils.setFieldValue((Object)broker.authorizer().get(), (String)"maxAclsPerTenant", (Object)limit);
        }
        if (cluster.isKraft()) {
            TestUtils.setFieldValue((Object)cluster.physicalCluster.kafkaCluster().kraftController().authorizer().get(), (String)"maxAclsPerTenant", (Object)limit);
        }
    }

    private Properties brokerProps() {
        return this.brokerProps(true);
    }

    private Properties brokerProps(boolean isConfluentCloudBroker) {
        Properties props = new Properties();
        props.put("confluent.cluster.link.enable", "true");
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.plugins.topic.policy.replication.factor", "1");
        props.put(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put(KafkaConfig.PasswordEncoderSecretProp(), "multi-tenant-cluster-link-secret");
        props.put(KafkaConfig.CreateTopicPolicyClassNameProp(), CreateTopicPolicy.class.getName());
        props.put(KafkaConfig.AlterConfigPolicyClassNameProp(), AlterConfigPolicy.class.getName());
        props.put(KafkaConfig.CreateClusterLinkPolicyClassNameProp(), CreateClusterLinkPolicy.class.getName());
        if (TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            props.put("confluent.cluster.link.metadata.topic.enable", "true");
            props.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            props.put("confluent.cluster.link.metadata.topic.partitions", "3");
        }
        if (isConfluentCloudBroker) {
            props.put("listener.name.internal.broker.interceptor.class", ConfluentCloudBrokerInterceptor.class.getName());
        }
        if (this.testRunsWithLinkCoordinator()) {
            props.put("confluent.cluster.link.metadata.topic.enable", "true");
            props.put("confluent.cluster.link.metadata.topic.partitions", "1");
            props.put("confluent.cluster.link.metadata.topic.replication.factor", "1");
            props.put("confluent.cluster.link.metadata.topic.min.isr", "1");
        }
        return props;
    }

    private void createSourceTopic() {
        this.createTopic(this.sourceCluster);
    }

    private void createTopic(MultiTenantCluster cluster) {
        cluster.physicalCluster.kafkaCluster().createTopic(cluster.user.tenantPrefix() + "linkedTopic", this.numPartitions, 1);
    }

    private void createMirrorTopic(ConfluentAdmin admin, String clusterLinkPrefix) throws Exception {
        this.createMirrorTopic(admin, clusterLinkPrefix, new NewMirrorTopic("tenantLink", "linkedTopic"));
    }

    private void createMirrorTopic(ConfluentAdmin admin, String clusterLinkPrefix, NewMirrorTopic newMirrorTopic) throws Exception {
        NewTopic newTopic = new NewTopic(clusterLinkPrefix + "linkedTopic", Optional.empty(), Optional.of((short)1)).mirror(Optional.of(new NewMirrorTopic("tenantLink", "linkedTopic")));
        CreateTopicsOptions option = new CreateTopicsOptions().timeoutMs(Integer.valueOf(5000));
        admin.createTopics(Collections.singleton(newTopic), option).all().get();
    }

    private void stopMirroring(MultiTenantCluster destCluster, String topic) throws Throwable {
        destCluster.admin.alterMirrors(Collections.singletonMap(topic, AlterMirrorOp.FAILOVER), new AlterMirrorsOptions()).all().get(15L, TimeUnit.SECONDS);
        TestUtils.waitForCondition(() -> destCluster.mirrorDescription(topic).state() == MirrorTopicDescription.State.STOPPED, (String)"Mirror not stopped");
    }

    private void verifyTopicListing() throws Exception {
        this.verifyTopicListing("", this.destCluster);
    }

    private void verifyTopicListing(String clusterLinkPrefix, MultiTenantCluster destCluster) throws Exception {
        TestUtils.retryOnExceptionWithTimeout(() -> {
            ListClusterLinksOptions options = new ListClusterLinksOptions().includeTopics(true);
            Collection listings = (Collection)destCluster.admin.listClusterLinks(options).result().get();
            Assertions.assertEquals((int)1, (int)listings.size());
            ClusterLinkListing listing = (ClusterLinkListing)listings.iterator().next();
            Assertions.assertEquals((Object)"tenantLink", (Object)listing.linkName());
            Assertions.assertTrue((boolean)listing.topics().isPresent());
            Collection topics = (Collection)listing.topics().get();
            Assertions.assertEquals((int)1, (int)topics.size());
            Assertions.assertEquals((Object)(clusterLinkPrefix + "linkedTopic"), topics.iterator().next());
        });
    }

    private void verifyTopicMirroring(Optional<String> listener) throws Throwable {
        this.verifyTopicMirroring("", this.sourceCluster, this.destCluster, listener);
    }

    private void verifyTopicMirroring() throws Throwable {
        this.verifyTopicMirroring("", this.sourceCluster, this.destCluster, Optional.empty());
    }

    private void verifyTopicMirroring(String clusterLinkPrefix, MultiTenantCluster sourceCluster, MultiTenantCluster destCluster) throws Throwable {
        this.verifyTopicMirroring(clusterLinkPrefix, sourceCluster, destCluster, Optional.empty());
    }

    private void verifyTopicMirroring(String clusterLinkPrefix, MultiTenantCluster sourceCluster, MultiTenantCluster destCluster, Optional<String> listener) throws Throwable {
        int firstMessageIndex = this.nextMessageIndex;
        int messageCount = 10;
        this.nextMessageIndex += messageCount;
        KafkaTestUtils.sendRecords(sourceCluster.getOrCreateProducer(listener), "linkedTopic", firstMessageIndex, messageCount);
        KafkaTestUtils.consumeRecords(destCluster.getOrCreateConsumer("destGroup", listener), clusterLinkPrefix + "linkedTopic", firstMessageIndex, messageCount);
    }

    private void verifyAclAndOffsetMigration(String clusterLinkPrefix) throws Throwable {
        Set<AclBinding> acls = this.addAcls((Admin)this.sourceCluster.admin, this.sourceCluster.user, new String[0]);
        this.addBrokerAclsForOffsetMigration(clusterLinkPrefix);
        String group = "linkedGroup";
        Map<TopicPartition, OffsetAndMetadata> offsets = this.commitOffsets((Admin)this.sourceCluster.admin, group);
        if (!clusterLinkPrefix.isEmpty()) {
            offsets = offsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicPartition(clusterLinkPrefix + ((TopicPartition)e.getKey()).topic(), ((TopicPartition)e.getKey()).partition()), e -> (OffsetAndMetadata)e.getValue()));
        }
        Set linkAcls = acls.stream().map(acl -> SecurityUtils.aclWithClusterLinkIds((AclBinding)acl, Collections.singleton(this.destCluster.linkId("tenantLink")))).collect(Collectors.toSet());
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.describeAcls(this.sourceCluster.user), linkAcls, "Acls not migrated");
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.committedOffsets(group), offsets, "Consumer offsets not migrated");
    }

    private Map<String, String> linkMetricTags(String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-name", "tenantLink");
        tags.put("request", request);
        return tags;
    }

    private Map<String, String> linkIdMetricTags(Uuid linkId, String tenant, String request) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("tenant", tenant);
        tags.put("link-id", linkId.toString());
        tags.put("request", request);
        return tags;
    }

    private void verifyTenantMetrics(Uuid linkId, int linkUserId, String clusterLinkPrefix, boolean isSourceInitiated) {
        String destMetricsGroup = "cluster-link-dest-tenant-metrics";
        HashMap<String, String> sourceTags = new HashMap<String, String>();
        sourceTags.put("tenant", "sourceLogicalCluster");
        sourceTags.put("user", String.valueOf(linkUserId));
        sourceTags.put("request", "Fetch");
        Assertions.assertFalse((boolean)this.metricsFound(this.sourceCluster, "tenant-metrics", sourceTags));
        Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Fetch");
        double responseTimeNs = this.metricValue(this.destCluster, destMetricsGroup, "response-time-ns-max", destTags);
        Assertions.assertTrue((responseTimeNs > 0.0 && responseTimeNs < (double)TimeUnit.SECONDS.toNanos(15L) ? 1 : 0) != 0, (String)("Invalid response time metric: " + responseTimeNs));
        HashMap<String, String> linkTags = new HashMap<String, String>();
        linkTags.put("tenant", "destLogicalCluster");
        linkTags.put("link-name", "tenantLink");
        HashMap<String, String> extraLinkTags = new HashMap<String, String>(linkTags);
        extraLinkTags.put("deployed-link-type", "hybrid");
        if (isSourceInitiated) {
            extraLinkTags.put("connection-mode", "inbound");
        } else {
            extraLinkTags.put("connection-mode", "outbound");
        }
        Assertions.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "link-count", extraLinkTags), (double)0.001);
        linkTags.put("state", "Mirror");
        Assertions.assertEquals((double)1.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "FailedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "PausedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "PendingStoppedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.put("state", "StoppedMirror");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-count", linkTags), (double)0.001);
        linkTags.remove("state");
        Assertions.assertEquals((double)this.numPartitions, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-partition-count", linkTags), (double)0.001);
        linkTags.put("topic", clusterLinkPrefix + "linkedTopic");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-lag", linkTags), (double)0.0);
        Assertions.assertTrue((this.metricValue(this.destCluster, "cluster-link-metrics", "mirror-topic-byte-total", linkTags) > 0.0 ? 1 : 0) != 0, (String)"Invalid mirror topic throughput");
        linkTags.remove("topic");
        String sourceMetricsGroup = "cluster-link-source-metrics";
        Map<String, String> sourceLinkTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Fetch");
        this.assertRange("requests", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-total", destTags), 10.0);
        this.assertRange("request-bytes", this.metricValue(this.sourceCluster, sourceMetricsGroup, "request-byte-total", sourceLinkTags), this.metricValue(this.destCluster, destMetricsGroup, "request-byte-total", destTags), 2000.0);
        this.assertRange("response-bytes", this.metricValue(this.destCluster, destMetricsGroup, "response-byte-total", destTags), this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-byte-total", sourceLinkTags), 2000.0);
        responseTimeNs = this.metricValue(this.sourceCluster, sourceMetricsGroup, "response-time-ns-max", sourceLinkTags);
        Assertions.assertTrue((responseTimeNs > 0.0 && responseTimeNs < (double)TimeUnit.SECONDS.toNanos(15L) ? 1 : 0) != 0, (String)("Invalid source link response time metric: " + responseTimeNs));
    }

    private void verifyTenantConnectionMetrics(boolean isSourceInitiated, boolean linkActive) throws Exception {
        HashMap<String, String> sourceConnTags = new HashMap<String, String>();
        sourceConnTags.put("tenant", "sourceLogicalCluster");
        sourceConnTags.put("user", String.valueOf(((MultiTenantCluster)this.sourceCluster).linkUser.userMetadata.userId()));
        Supplier<Double> sourceConn = () -> this.metricValue(this.sourceCluster, "tenant-metrics", "active-authenticated-connection-count", sourceConnTags);
        int minLinkConnections = linkActive ? 2 : 0;
        int maxLinkConnections = linkActive ? 10 : 0;
        TestUtils.waitForCondition(() -> (Double)sourceConn.get() >= (double)minLinkConnections && (Double)sourceConn.get() <= (double)maxLinkConnections, () -> "Source connection metric not updated for link, value = " + sourceConn.get());
        if (isSourceInitiated) {
            HashMap<String, String> destConnTags = new HashMap<String, String>();
            destConnTags.put("tenant", "destLogicalCluster");
            destConnTags.put("user", String.valueOf(((MultiTenantCluster)this.destCluster).linkUser.userMetadata.userId()));
            Supplier<Double> destConn = () -> this.metricValue(this.destCluster, "tenant-metrics", "active-authenticated-connection-count", destConnTags);
            TestUtils.waitForCondition(() -> (Double)destConn.get() >= (double)minLinkConnections && (Double)destConn.get() <= (double)maxLinkConnections, () -> "Destination connection metric not updated for reverse connections, value = " + destConn.get());
        }
    }

    private void verifyMetricsGroups(Uuid linkId) {
        double sourceValue;
        Map<String, String> destTags = this.linkMetricTags("destLogicalCluster", "Metadata");
        Map<String, String> sourceTags = this.linkMetricTags("sourceLogicalCluster", "Metadata");
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-dest-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        Assertions.assertEquals((double)0.0, (double)this.metricValue(this.destCluster, "cluster-link-source-tenant-metrics", "request-total", destTags, false), (double)0.001);
        double destValue = this.metricValue(this.destCluster, "cluster-link-dest-tenant-metrics", "request-total", destTags);
        Assertions.assertTrue((destValue > 0.0 ? 1 : 0) != 0, (String)("Dest metric not updated: " + destValue));
        if (this.sourceCluster.useSourceInitiatedLink) {
            sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags);
            Assertions.assertTrue((sourceValue > 0.0 ? 1 : 0) != 0, (String)("Source metric not updated: " + sourceValue));
        } else {
            Assertions.assertEquals((double)0.0, (double)this.metricValue(this.sourceCluster, "cluster-link-source-tenant-metrics", "request-total", sourceTags, false), (double)0.001);
        }
        destTags = this.linkIdMetricTags(linkId, "destLogicalCluster", "Metadata");
        sourceTags = this.linkIdMetricTags(linkId, "sourceLogicalCluster", "Metadata");
        sourceValue = this.metricValue(this.sourceCluster, "cluster-link-source-metrics", "request-total", sourceTags);
        Assertions.assertTrue((sourceValue > 0.0 ? 1 : 0) != 0, (String)("Source metric not updated: " + sourceValue));
        double destSourceMetricValue = this.metricValue(this.destCluster, "cluster-link-source-metrics", "request-total", destTags, false);
        if (this.destCluster.useSourceInitiatedLink) {
            Assertions.assertTrue((destSourceMetricValue > 0.0 ? 1 : 0) != 0, (String)("Destination incoming request metric not updated: " + destValue));
        } else {
            Assertions.assertEquals((double)0.0, (double)destSourceMetricValue, (double)0.001);
        }
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("link-id", Utils.toJavaUuid((Uuid)linkId).toString());
        tags.put("tenant", "sourceLogicalCluster");
        tags.put("mode", "source");
        int numSourceBrokers = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().size();
        Assertions.assertEquals((int)numSourceBrokers, (int)((int)this.totalMetricValue(this.sourceCluster, "cluster-link-metrics", "active-link-count", tags)));
        tags.put("tenant", "destLogicalCluster");
        tags.put("mode", "destination");
        int numDestBrokers = this.destCluster.physicalCluster.kafkaCluster().kafkaBrokers().size();
        Assertions.assertEquals((int)numDestBrokers, (int)((int)this.totalMetricValue(this.destCluster, "cluster-link-metrics", "active-link-count", tags)));
    }

    private void verifySocketBufferSizeUpdate() throws Exception {
        long invalidSize = 0x200000L;
        ConfigResource linkResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink");
        ExecutionException e1 = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(invalidSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertEquals(PolicyViolationException.class, e1.getCause().getClass());
        ExecutionException e2 = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(linkResource, Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(1024)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertEquals(PolicyViolationException.class, e2.getCause().getClass());
        long validSize = 131072L;
        this.destCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, "tenantLink"), Collections.singleton(new AlterConfigOp(new ConfigEntry(KafkaConfig.ReplicaSocketReceiveBufferBytesProp(), String.valueOf(validSize)), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.linkConfig("tenantLink", KafkaConfig.ReplicaSocketReceiveBufferBytesProp()), String.valueOf(validSize), "Link config not updated");
    }

    private void assertRange(String name, double receiverValue, double senderValue, double maxDiff) {
        String message = String.format("Metric values for '%s' (%f, %f) not within expected range %f", name, receiverValue, senderValue, maxDiff);
        Assertions.assertTrue((senderValue - receiverValue <= maxDiff && receiverValue - senderValue < 0.001 ? 1 : 0) != 0, (String)message);
    }

    private boolean metricsFound(MultiTenantCluster cluster, String group, Map<String, String> tags) {
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                MetricName metricName = (MetricName)entry.getKey();
                if (!metricName.group().equals(group) || !this.tagsMatched(metricName, tags)) continue;
                return true;
            }
        }
        return false;
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags) {
        return this.metricValue(cluster, group, name, tags, true);
    }

    private double metricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags, boolean failIfNotFound) {
        double value = 0.0;
        boolean found = false;
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                found = true;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        if (failIfNotFound) {
            Assertions.assertTrue((boolean)found, (String)("Metric not found " + name));
        }
        return value;
    }

    private double totalMetricValue(MultiTenantCluster cluster, String group, String name, Map<String, String> tags) {
        double value = 0.0;
        for (KafkaBroker kafkaBroker : cluster.physicalCluster.kafkaCluster().kafkaBrokers()) {
            for (Map.Entry entry : kafkaBroker.metrics().metrics().entrySet()) {
                if (!this.isMatchingMetric((MetricName)entry.getKey(), name, group, tags)) continue;
                value += ((Double)((KafkaMetric)entry.getValue()).metricValue()).doubleValue();
            }
        }
        return value;
    }

    private boolean isMatchingMetric(MetricName metricName, String name, String group, Map<String, String> tags) {
        if (!metricName.name().equals(name)) {
            return false;
        }
        if (!metricName.group().equals(group)) {
            return false;
        }
        return this.tagsMatched(metricName, tags);
    }

    private boolean tagsMatched(MetricName metricName, Map<String, String> tags) {
        Map metricTags = metricName.tags();
        for (Map.Entry<String, String> entry : tags.entrySet()) {
            if (entry.getValue().equals(metricTags.get(entry.getKey()))) continue;
            return false;
        }
        return true;
    }

    private void addSourcePartitionsAndVerifyMirror(int newPartitionCount, String clusterLinkPrefix) throws Exception {
        ((KafkaFuture)this.sourceCluster.admin.createPartitions(Collections.singletonMap("linkedTopic", NewPartitions.increaseTo((int)newPartitionCount))).values().get("linkedTopic")).get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.partitionsForTopic(clusterLinkPrefix + "linkedTopic"), newPartitionCount, "Topic partitions not updated");
        this.numPartitions = newPartitionCount;
    }

    private void changeSourceTopicConfigAndVerifyMirror(String clusterLinkPrefix) throws Exception {
        this.sourceCluster.admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "linkedTopic"), Collections.singleton(new AlterConfigOp(new ConfigEntry("max.message.bytes", "123456"), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
        MultiTenantClusterLinkTest.waitFor(() -> this.destCluster.topicConfig(clusterLinkPrefix + "linkedTopic", "max.message.bytes"), "123456", "Topic configs not migrated");
    }

    private void verifyClusterLinkCreateFailureDueToMaxLink(ClusterLinkDescription.LinkMode linkMode, String linkName, int linkUserId, ClusterLinkDescription.LinkMode exceededMode, int maxLinks) throws Throwable {
        Map<String, String> configOverrides = Collections.singletonMap(ClusterLinkConfig.LinkModeProp(), linkMode.name());
        CreateClusterLinksResult createResult = null;
        switch (exceededMode) {
            case DESTINATION: {
                createResult = this.destCluster.createDestClusterLinkResult(this.destCluster.admin, linkName, this.sourceCluster, linkUserId, "EXTERNAL", configOverrides, true);
                break;
            }
            case SOURCE: {
                createResult = this.sourceCluster.createSourceClusterLinkResult(this.sourceCluster.admin, linkName, this.destCluster, linkUserId, configOverrides);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unexpected policy violation mode " + exceededMode);
            }
        }
        String expectedErrorMsg = String.format("Cluster link with link mode %s could not be created because this cluster already has the maximum number of%s cluster links (%d). You can request a higher limit through Confluent Support.", linkMode.name(), exceededMode.name(), maxLinks);
        if (!TestInfoUtils.isKRaft((TestInfo)this.testInfo)) {
            expectedErrorMsg = "Unable to validate cluster link due to error: " + expectedErrorMsg;
        }
        TestUtils.assertFutureThrows((Future)createResult.all(), PolicyViolationException.class, (String)expectedErrorMsg);
    }

    private void changeMaxClusterLinks(int maxDestClusterLinks, int maxSourceClusterLinks) throws Exception {
        if (maxDestClusterLinks >= 0) {
            this.destCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.destination.links.per.tenant", String.valueOf(maxDestClusterLinks)), AlterConfigOp.OpType.SET)))).all().get();
        }
        if (maxSourceClusterLinks >= 0) {
            this.sourceCluster.internalAdminClient().incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singletonList(new AlterConfigOp(new ConfigEntry("confluent.plugins.cluster.link.policy.max.source.links.per.tenant", String.valueOf(maxSourceClusterLinks)), AlterConfigOp.OpType.SET)))).all().get();
        }
    }

    private Set<AclBinding> addAcls(Admin adminClient, LogicalClusterUser user, String ... prefixes) throws Exception {
        String principal = user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding topicAclWithPrefix = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "src_linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
        AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
        Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, groupAcl, clusterAcl, topicAclWithPrefix});
        for (String prefix : prefixes) {
            acls.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, prefix, PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        }
        adminClient.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        return acls;
    }

    private void addSourceAcl(String topic) throws Exception {
        this.addAcl(this.sourceCluster, topic);
    }

    private void addAcl(MultiTenantCluster cluster, String topic) throws Exception {
        String principal = cluster.user.unprefixedKafkaPrincipal().toString();
        AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic, PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.ALL, AclPermissionType.ALLOW));
        cluster.admin.createAcls(Collections.singleton(topicAcl)).all().get(15L, TimeUnit.SECONDS);
    }

    private void waitForAcls(MultiTenantCluster cluster, int count, Uuid linkId) throws Exception {
        String clusterType = cluster == this.sourceCluster ? "Source" : "Destination";
        TestUtils.waitForCondition(() -> this.aclCount(cluster, linkId) == count, () -> String.format("%s acls not created, expected %d got %d", clusterType, count, this.aclCount(cluster, linkId)));
    }

    private int aclCount(MultiTenantCluster cluster, Uuid linkId) {
        try {
            AclBindingFilter filter = AclBindingFilter.ANY;
            if (linkId != null) {
                filter = SecurityUtils.aclFilterWithClusterLinkIds((AclBindingFilter)filter, Collections.singleton(linkId));
            }
            return ((Collection)cluster.admin.describeAcls(filter).values().get(15L, TimeUnit.SECONDS)).size();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private double aclAddFailedMetric() {
        return this.totalMetricValue(this.destCluster, "cluster-link-metrics", "acls-add-failed-total", Collections.emptyMap());
    }

    private void addBrokerAclsForOffsetMigration(String clusterLinkPrefix) {
        String withLinkPrefix = this.destCluster.user.tenantPrefix() + clusterLinkPrefix + "linked";
        this.destCluster.physicalCluster.newAclCommand().consumeAclArgs(PhysicalCluster.BROKER_PRINCIPAL, withLinkPrefix, withLinkPrefix, PatternType.PREFIXED).execute();
    }

    private Map<TopicPartition, OffsetAndMetadata> commitOffsets(Admin adminClient, String group) throws Exception {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        LogManager logManager = this.sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).logManager();
        for (int i = 0; i < this.numPartitions; ++i) {
            TopicPartition tp = new TopicPartition("linkedTopic", i);
            TopicPartition prefixedPartition = new TopicPartition(this.sourceCluster.user.tenantPrefix() + "linkedTopic", i);
            offsets.put(tp, new OffsetAndMetadata(((AbstractLog)logManager.getLog(prefixedPartition, false).get()).localLogEndOffset()));
        }
        adminClient.alterConsumerGroupOffsets(group, offsets).all().get();
        return offsets;
    }

    private static <T> void waitFor(Supplier<T> actual, T expected, String error) throws Exception {
        TestUtils.waitForCondition(() -> expected.equals(actual.get()), () -> error + " : expected=" + expected + ", actual=" + actual.get());
    }

    private String aclFilters(KafkaPrincipal ... principals) {
        StringBuilder builder = new StringBuilder();
        builder.append("{ \"aclFilters\": [");
        String resourceFilter = "{ \"resourceFilter\": { \"resourceType\": \"any\", \"patternType\": \"any\" }, ";
        String accessFilter = "\"accessFilter\": { \"operation\": \"any\", \"permissionType\": \"any\", ";
        for (int i = 0; i < principals.length; ++i) {
            if (i != 0) {
                builder.append(",");
            }
            builder.append(resourceFilter);
            builder.append(accessFilter);
            builder.append(String.format("\"principal\": \"%s\"", principals[i]));
            builder.append("}}");
        }
        builder.append("]}");
        return builder.toString();
    }

    private Set<AclBinding> aclBindings(Collection<Uuid> linkIds, AclBinding ... acls) {
        return Stream.of(acls).map(acl -> SecurityUtils.aclWithClusterLinkIds((AclBinding)acl, (Collection)linkIds)).collect(Collectors.toSet());
    }

    private void waitForDestAcls(Collection<AclBinding> expectedAcls) throws Exception {
        MultiTenantClusterLinkTest.waitFor(() -> this.currentAcls(this.destCluster.admin), new HashSet<AclBinding>(expectedAcls), "ACLs sync result failed");
    }

    private Set<AclBinding> currentAcls(ConfluentAdmin admin) {
        try {
            return new HashSet<AclBinding>((Collection)admin.describeAcls(AclBindingFilter.ANY).values().get());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void deleteAcls(ConfluentAdmin admin, AclBinding ... acls) throws Exception {
        Collection filters = this.aclBindings(Collections.emptySet(), acls).stream().map(AclBinding::toFilter).collect(Collectors.toSet());
        admin.deleteAcls(filters).all().get(15L, TimeUnit.SECONDS);
    }

    private void verifyMismatchedLinkIdFailure(Map<String, String> linkConfigs) throws Exception {
        Uuid nonexistentLinkid = Uuid.randomUuid();
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
            Void cfr_ignored_0 = (Void)this.sourceCluster.createDestClusterLinkResult(this.sourceCluster.admin, "tenantLink", this.destCluster, 1002, "EXTERNAL", false, linkConfigs, true, "", nonexistentLinkid).all().get(15L, TimeUnit.SECONDS);
        });
        Assertions.assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
        Assertions.assertTrue((boolean)e.getMessage().contains("does not match requested link id"), (String)e.getMessage());
    }

    public static class DummyOAuthLoginModule
    extends OAuthBearerLoginModule {
        public boolean login() throws LoginException {
            return true;
        }

        public boolean logout() {
            return true;
        }

        public boolean commit() {
            return true;
        }

        public boolean abort() {
            return true;
        }
    }

    private static class MultiTenantCluster
    extends IntegrationTestHarness {
        private final Map<String, String> sslConfigs = new HashMap<String, String>();
        private PhysicalCluster physicalCluster;
        private LogicalCluster logicalCluster;
        private LogicalClusterUser user;
        private LogicalClusterUser linkUser;
        private ConfluentAdmin admin;
        private boolean useSourceInitiatedLink;
        KafkaProducer<String, String> producer;
        KafkaConsumer<String, String> consumer;

        public MultiTenantCluster(TestInfo testInfo) {
            super(testInfo);
        }

        void startCluster(Properties brokerOverrideProps, String logicalClusterId, int userId, boolean useSslForInternalListener) throws Exception {
            this.startCluster(brokerOverrideProps, brokerOverrideProps, logicalClusterId, userId, useSslForInternalListener, Optional.empty());
        }

        void startCluster(Properties brokerOverrideProps, Properties controllerOverrideProps, String logicalClusterId, int userId, boolean useSslForInternalListener, Optional<String> adminClientListener) throws Exception {
            if (useSslForInternalListener) {
                Properties allBrokerOverrideProps = new Properties();
                this.createSslStores();
                allBrokerOverrideProps.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL,EXTERNAL:SASL_PLAINTEXT");
                allBrokerOverrideProps.putAll(this.sslConfigs);
                allBrokerOverrideProps.putAll((Map<?, ?>)brokerOverrideProps);
                this.physicalCluster = this.start(allBrokerOverrideProps, controllerOverrideProps, true, Collections.singleton(String.format("User:O=A server,CN=%s", MultiTenantClusterLinkTest.SSL_KAFKA_CN)), Optional.of(Time.SYSTEM), cluster -> {});
            } else {
                this.physicalCluster = this.start(brokerOverrideProps, controllerOverrideProps, true, Optional.of(Time.SYSTEM), cluster -> {});
            }
            this.logicalCluster = this.physicalCluster.createLogicalCluster(logicalClusterId, 100, userId);
            this.user = this.logicalCluster.user(userId);
            if (useSslForInternalListener) {
                Properties clientProps = new Properties();
                for (Map.Entry<String, String> kv : this.clientConfigs("INTERNAL").entrySet()) {
                    clientProps.put(kv.getKey(), kv.getValue());
                }
                this.admin = (ConfluentAdmin)super.createAdminClient(this.physicalCluster.kafkaCluster().bootstrapServers("INTERNAL"), SecurityProtocol.SSL, null, null, clientProps);
            } else {
                this.admin = adminClientListener.isPresent() ? (ConfluentAdmin)super.createAdminClient(this.logicalCluster.adminUser(), adminClientListener.get()) : (ConfluentAdmin)super.createAdminClient(this.logicalCluster.adminUser());
            }
        }

        Admin internalAdminClient() {
            return this.physicalCluster.superAdminClient();
        }

        LogicalClusterUser createLinkUser(int newUserId) throws Exception {
            UserMetadata newUser = this.physicalCluster.getOrCreateUser(newUserId, false);
            LogicalClusterUser linkUser = this.logicalCluster.addUser(newUser);
            this.addLinkAcls(linkUser);
            if (this.useSourceInitiatedLink) {
                this.addReverseConnectionAcls(linkUser);
            }
            return linkUser;
        }

        private void createSslStores() throws Exception {
            CertStores certStores = new CertStores.Builder(true).cn(MultiTenantClusterLinkTest.SSL_KAFKA_CN).addHostName("localhost").build();
            Properties props = new Properties();
            BiConsumer<String, Object> copyEntry = (k, v) -> {
                if (v instanceof Password) {
                    props.setProperty((String)k, ((Password)v).value());
                } else if (v instanceof List) {
                    List listOfString = (List)v;
                    props.setProperty((String)k, String.join((CharSequence)",", listOfString));
                } else if (v != null) {
                    props.setProperty((String)k, (String)v);
                }
            };
            certStores.keyStoreProps().forEach(copyEntry);
            certStores.trustStoreProps().forEach(copyEntry);
            TestSslUtils.convertToPemWithoutFiles((Properties)props);
            props.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> this.sslConfigs.put((String)k, (String)v)));
            this.sslConfigs.put("ssl.client.auth", "required");
        }

        private boolean usesSslForInternalListener() {
            return !this.sslConfigs.isEmpty();
        }

        void deleteUser(LogicalClusterUser user, boolean deleteAcls) {
            this.logicalCluster.removeUser(user.userMetadata.userId());
            if (deleteAcls) {
                this.deleteAcls(user);
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }

        private Map<String, String> clientConfigs(String listenerName) {
            return this.clientConfigs(listenerName, true);
        }

        private Map<String, String> clientConfigs(String listenerName, boolean includeMtlsConfigs) {
            Properties props = listenerName.equals("EXTERNAL") ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig()) : (this.usesSslForInternalListener() ? KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.SSL, null, null) : KafkaTestUtils.securityProps(this.physicalCluster.kafkaCluster().bootstrapServers(listenerName), SecurityProtocol.PLAINTEXT, null, ""));
            HashMap<String, String> clientConfigs = new HashMap<String, String>();
            props.stringPropertyNames().forEach(name -> clientConfigs.put((String)name, props.getProperty((String)name)));
            if (includeMtlsConfigs) {
                clientConfigs.putAll(this.sslConfigs);
            }
            return clientConfigs;
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, Map<String, String> overrides, boolean validateLink) throws Throwable {
            return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, sourceListener, overrides, validateLink, true);
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, Map<String, String> overrides, boolean validateLink, boolean enableAclSync) throws Throwable {
            if (enableAclSync) {
                return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, sourceListener, overrides, validateLink, MultiTenantClusterLinkTest.SYNC_ALL_ACL_FILTER);
            }
            return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, sourceListener, overrides, validateLink, "");
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, Map<String, String> overrides, boolean validateLink, String aclFilter) throws Throwable {
            return this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, sourceListener, true, overrides, validateLink, aclFilter, null);
        }

        CreateClusterLinksResult createDestClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, String sourceListener, boolean explicitSslLinkProps, Map<String, String> overrides, boolean validateLink, String aclFilter, Uuid linkId) throws Throwable {
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            if (!this.useSourceInitiatedLink) {
                sourceCluster.linkUser = sourceCluster.createLinkUser(linkUserId);
                if (explicitSslLinkProps) {
                    linkConfigs.putAll(sourceCluster.clientConfigs(sourceListener));
                } else {
                    linkConfigs.putAll(sourceCluster.clientConfigs(sourceListener, false));
                }
            } else {
                linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
                linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "INBOUND");
            }
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            if (!aclFilter.isEmpty()) {
                linkConfigs.put(ClusterLinkConfig.AclFiltersProp(), aclFilter);
                linkConfigs.put(ClusterLinkConfig.AclSyncEnableProp(), "true");
                linkConfigs.put(ClusterLinkConfig.AclSyncMsProp(), "2000");
            }
            String allGroupsFilter = "{ \"groupFilters\": [{ \"name\": \"*\", \"patternType\": \"literal\", \"filterType\": \"include\" }]}";
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), allGroupsFilter);
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            linkConfigs.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "2000");
            String sourceClusterId = sourceListener.equals("EXTERNAL") ? sourceCluster.logicalCluster.logicalClusterId() : sourceCluster.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterId();
            linkConfigs.putAll(overrides);
            NewClusterLink newClusterLink = new NewClusterLink(linkName, sourceClusterId, linkConfigs, linkId);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(validateLink);
            return admin.createClusterLinks(Collections.singleton(newClusterLink), options);
        }

        void createDestClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> overrides) throws Throwable {
            this.createDestClusterLink(admin, linkName, sourceCluster, linkUserId, overrides, true);
        }

        void createDestClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster sourceCluster, int linkUserId, Map<String, String> overrides, Boolean enableSyncAcl) throws Throwable {
            this.createDestClusterLinkResult(admin, linkName, sourceCluster, linkUserId, "EXTERNAL", overrides, true, enableSyncAcl).all().get();
            this.setInternalClusterLinkConfigs(linkName, Collections.singletonMap("metadata.max.age.ms", "1000"));
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId, Map<String, String> overrides) throws Throwable {
            return this.createSourceClusterLinkResult(admin, linkName, destCluster, linkUserId, overrides, null);
        }

        CreateClusterLinksResult createSourceClusterLinkResult(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId, Map<String, String> overrides, String bootstrapListener) throws Throwable {
            Assertions.assertTrue((boolean)this.useSourceInitiatedLink);
            destCluster.linkUser = destCluster.createLinkUser(linkUserId + 1000);
            this.linkUser = this.createLinkUser(linkUserId);
            Properties destClientProps = KafkaTestUtils.securityProps(destCluster.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), destCluster.linkUser.saslJaasConfig());
            String sourceBootstrapServers = bootstrapListener == null ? this.physicalCluster.bootstrapServers() : this.physicalCluster.bootstrapServers(bootstrapListener);
            Properties sourceClientProps = KafkaTestUtils.securityProps(sourceBootstrapServers, SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), this.linkUser.saslJaasConfig());
            HashMap<String, String> linkConfigs = new HashMap<String, String>();
            linkConfigs.put(ClusterLinkConfig.LinkModeProp(), "SOURCE");
            linkConfigs.put(ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
            destClientProps.stringPropertyNames().forEach(name -> linkConfigs.put((String)name, destClientProps.getProperty((String)name)));
            sourceClientProps.stringPropertyNames().forEach(name -> linkConfigs.put(ClusterLinkConfig.LocalPrefix() + name, sourceClientProps.getProperty((String)name)));
            linkConfigs.put("request.timeout.ms", "10000");
            linkConfigs.put(ClusterLinkConfig.TopicConfigSyncMsProp(), "1000");
            linkConfigs.putAll(overrides);
            NewClusterLink newClusterLink = new NewClusterLink(linkName, destCluster.logicalCluster.logicalClusterId(), linkConfigs);
            CreateClusterLinksOptions options = new CreateClusterLinksOptions().validateOnly(false).validateLink(true);
            return admin.createClusterLinks(Collections.singleton(newClusterLink), options);
        }

        void createSourceClusterLink(ConfluentAdmin admin, String linkName, MultiTenantCluster destCluster, int linkUserId) throws Throwable {
            this.createSourceClusterLinkResult(admin, linkName, destCluster, linkUserId, Collections.emptyMap()).all().get();
        }

        void deleteClusterLink(ConfluentAdmin admin, String linkName) throws Throwable {
            DeleteClusterLinksOptions options = new DeleteClusterLinksOptions();
            admin.deleteClusterLinks(Collections.singleton(linkName), options).all().get();
        }

        void setInternalClusterLinkConfigs(String linkName, Map<String, String> configs) throws Exception {
            String prefixedLinkName = this.user.tenantPrefix() + linkName;
            List<ClusterLinkFactory.ClientManager> clientManagers = this.waitForClientManagers(prefixedLinkName);
            List<Admin> admins = this.waitForAdmins(clientManagers, Collections.emptyList());
            ConfigResource resource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, prefixedLinkName);
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>(configs.size());
            for (Map.Entry<String, String> entry : configs.entrySet()) {
                ops.add(new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET));
            }
            AdminClient adminClient = this.physicalCluster.superAdminClient();
            adminClient.incrementalAlterConfigs(Collections.singletonMap(resource, ops)).all().get();
            for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                for (Map.Entry<String, String> e : configs.entrySet()) {
                    MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(clientManager, prefixedLinkName, (String)e.getKey()), e.getValue(), "Link config not propagated: " + e.getKey());
                }
            }
            this.waitForAdmins(clientManagers, admins);
        }

        void alterClusterLink(ConfluentAdmin admin, String linkName, String configName, String configValue) throws Exception {
            this.alterClusterLink(admin, linkName, configName, configValue, true);
        }

        void alterClusterLink(ConfluentAdmin admin, String linkName, String configName, String configValue, boolean validate) throws Exception {
            admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkName), Collections.singleton(new AlterConfigOp(new ConfigEntry(configName, configValue), AlterConfigOp.OpType.SET)))).all().get(15L, TimeUnit.SECONDS);
            if (validate) {
                MultiTenantClusterLinkTest.waitFor(() -> this.linkConfig(linkName, configName), configValue, "Link config not updated");
            }
        }

        private List<ClusterLinkFactory.ClientManager> waitForClientManagers(String linkName) throws Exception {
            TestUtils.waitForCondition(() -> {
                for (KafkaBroker kafkaBroker : this.physicalCluster.kafkaCluster().kafkaBrokers()) {
                    if (this.clusterLinkClientManager(kafkaBroker, linkName).isPresent()) continue;
                    return false;
                }
                return true;
            }, (String)"Cluster link client managers not created");
            return this.physicalCluster.kafkaCluster().kafkaBrokers().stream().map(kafkaBroker -> this.clusterLinkClientManager((KafkaBroker)kafkaBroker, linkName).get()).collect(Collectors.toList());
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaServer server, String linkName) {
            ClusterLinkFactory.LinkManager linkManager = server.clusterLinkManager();
            Option link = linkManager.listClusterLinks().find(l -> l.linkName().equals(linkName));
            if (link.isEmpty()) {
                return Optional.empty();
            }
            return Optional.ofNullable(linkManager.clientManager(((ClusterLinkData)link.get()).linkId()).getOrElse(() -> null));
        }

        private Optional<ClusterLinkFactory.ClientManager> clusterLinkClientManager(KafkaBroker kafkaBroker, String linkName) {
            ClusterLinkFactory.LinkManager linkManager = kafkaBroker.clusterLinkManager();
            Option link = linkManager.listClusterLinks().find(l -> l.linkName().equals(linkName));
            if (link.isEmpty()) {
                return Optional.empty();
            }
            return Optional.ofNullable(linkManager.clientManager(((ClusterLinkData)link.get()).linkId()).getOrElse(() -> null));
        }

        private String linkConfig(ClusterLinkFactory.ClientManager clientManager, String linkName, String configName) {
            return (String)clientManager.currentConfig().originalsStrings().get(configName);
        }

        private List<Admin> waitForAdmins(List<ClusterLinkFactory.ClientManager> clientManagers, List<Admin> oldAdmins) throws Exception {
            ArrayList<Admin> admins = new ArrayList<Admin>();
            TestUtils.waitForCondition(() -> {
                for (ClusterLinkFactory.ClientManager clientManager : clientManagers) {
                    admins.clear();
                    ConfluentAdmin admin = ((ClusterLinkDestClientManager)clientManager).getAdmin();
                    if (admin == null) {
                        return false;
                    }
                    if (oldAdmins.stream().anyMatch(arg_0 -> MultiTenantCluster.lambda$null$15((Admin)admin, arg_0))) {
                        return false;
                    }
                    admins.add((Admin)admin);
                }
                return true;
            }, (String)"Admin clients not created");
            return admins;
        }

        KafkaProducer<String, String> getOrCreateProducer() {
            return this.getOrCreateProducer(Optional.empty());
        }

        KafkaProducer<String, String> getOrCreateProducer(Optional<String> listener) {
            if (this.producer == null) {
                this.producer = this.createProducer(this.user, SecurityProtocol.SASL_PLAINTEXT, listener);
            }
            return this.producer;
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String groupId) {
            return this.getOrCreateConsumer(groupId, Optional.empty());
        }

        KafkaConsumer<String, String> getOrCreateConsumer(String groupId, Optional<String> listener) {
            if (this.consumer == null) {
                this.consumer = this.createConsumer(this.user, groupId, SecurityProtocol.SASL_PLAINTEXT, listener);
            }
            return this.consumer;
        }

        Set<AclBinding> describeAcls(LogicalClusterUser user) {
            try {
                return new HashSet<AclBinding>((Collection)this.admin.describeAcls(this.aclBindingFilter(user)).values().get(15L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void deleteAcls(LogicalClusterUser user) {
            try {
                this.admin.deleteAcls(Collections.singleton(this.aclBindingFilter(user))).all().get(15L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private AclBindingFilter aclBindingFilter(LogicalClusterUser user) {
            AccessControlEntryFilter aceFilter = new AccessControlEntryFilter(user.unprefixedKafkaPrincipal().toString(), null, AclOperation.ANY, AclPermissionType.ANY);
            return new AclBindingFilter(ResourcePatternFilter.ANY, aceFilter);
        }

        private void addClusterAcls(KafkaPrincipal principal, String op) {
            this.physicalCluster.newAclCommand().clusterAclArgs(principal, op).execute();
        }

        private void addLinkAcls(LogicalClusterUser user) throws Exception {
            this.admin.createAcls(this.linkAcls(user)).all().get(15L, TimeUnit.SECONDS);
        }

        private void deleteLinkAcls(LogicalClusterUser user) throws Exception {
            this.admin.deleteAcls((Collection)this.linkAcls(user).stream().map(AclBinding::toFilter).collect(Collectors.toList())).all().get(15L, TimeUnit.SECONDS);
        }

        private Set<AclBinding> linkAcls() {
            return this.linkAcls(this.user);
        }

        private Set<AclBinding> linkAcls(LogicalClusterUser user) {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding topicAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            AclBinding topicConfigAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "linked", PatternType.PREFIXED), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
            AclBinding clusterDescribeConfigsAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW));
            AclBinding groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{topicAcl, topicConfigAcl, groupAcl, clusterAcl, clusterDescribeConfigsAcl});
            return acls;
        }

        private void addReverseConnectionAcls(LogicalClusterUser user) throws Exception {
            String principal = user.unprefixedKafkaPrincipal().toString();
            AclBinding clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL), new AccessControlEntry(principal, "*", AclOperation.ALTER, AclPermissionType.ALLOW));
            Set acls = Utils.mkSet((Object[])new AclBinding[]{clusterAcl});
            this.admin.createAcls((Collection)acls).all().get(15L, TimeUnit.SECONDS);
        }

        int partitionsForTopic(String topic) {
            try {
                TopicDescription topicDesc = (TopicDescription)((KafkaFuture)this.admin.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic)).get(15L, TimeUnit.SECONDS);
                return topicDesc.partitions().size();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        String topicConfig(String topic, String name) {
            try {
                ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
                Config config = (Config)((KafkaFuture)this.admin.describeConfigs(Collections.singleton(resource)).values().get(resource)).get(15L, TimeUnit.SECONDS);
                return config.entries().stream().filter(e -> e.name().equals(name)).findFirst().map(ConfigEntry::value).get();
            }
            catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        String linkConfig(String linkName, String configName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            Uuid linkId = this.linkId(linkName);
            ClusterLinkFactory.ConnectionManager connManager = (ClusterLinkFactory.ConnectionManager)linkManager.connectionManager(linkId).get();
            return (String)connManager.currentConfig().originalsStrings().get(configName);
        }

        ClusterLinkFactory.LinkManager linkManager() {
            return this.physicalCluster.kafkaCluster().kafkaBrokers().get(0).clusterLinkManager();
        }

        Uuid linkId(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            return ((ClusterLinkData)linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).get()).linkId();
        }

        boolean linkIdExists(String linkName) {
            ClusterLinkFactory.LinkManager linkManager = this.linkManager();
            return linkManager.listClusterLinks().find(d -> d.linkName().equals(this.user.tenantPrefix() + linkName)).isDefined();
        }

        ClusterLinkDescription.LinkState linkState(String linkName) {
            try {
                ListClusterLinksOptions options = new ListClusterLinksOptions().includeTopics(false);
                Collection listings = (Collection)this.admin.listClusterLinks(options).result().get();
                for (ClusterLinkListing listing : listings) {
                    if (!listing.linkName().equals(linkName)) continue;
                    return listing.linkState();
                }
                return ClusterLinkDescription.LinkState.UNKNOWN;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        Map<TopicPartition, OffsetAndMetadata> committedOffsets(String group) {
            try {
                return (Map)this.admin.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        MirrorTopicDescription mirrorDescription(String topic) {
            try {
                MirrorTopicDescription mirror = (MirrorTopicDescription)((KafkaFuture)this.admin.describeMirrors(Collections.singleton(topic), new DescribeMirrorsOptions()).result().get(topic)).get(15L, TimeUnit.SECONDS);
                return mirror;
            }
            catch (Exception e) {
                return null;
            }
        }

        private static /* synthetic */ boolean lambda$null$15(Admin admin, Admin oldAdmin) {
            return oldAdmin == admin;
        }
    }
}

