/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.metadata.ClusterLinkRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RemoveClusterLinkRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.AclControlManager;
import org.apache.kafka.controller.ClusterLinkControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ConfigurationControlManagerTest;
import org.apache.kafka.controller.ConfigurationValidator;
import org.apache.kafka.controller.EncryptionControlManager;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.interceptor.ClusterLinkInterceptor;
import org.apache.kafka.server.policy.CreateClusterLinkPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@Timeout(value=40L)
public class ClusterLinkControlManagerTest {
    static final Map<String, String> VALID_CONFIG = new HashMap<String, String>();
    static final Uuid TOPIC_ID = Uuid.randomUuid();

    @Test
    public void testCreateAndDeleteLink() {
        TestContext ctx = new TestContext();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), VALID_CONFIG), ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Optional linkId = ctx.clusterLinkControl.getClusterLinkId("test-link");
        Assertions.assertTrue((boolean)linkId.isPresent());
        Assertions.assertEquals(ctx.configControl.getConfigs(new ConfigResource(ConfigResource.Type.CLUSTER_LINK, ((Uuid)linkId.get()).toString())).get("bootstrap.servers"), (Object)"localhost:9092");
        ClusterLinkControlManagerTest.deleteClusterLink("test-link", ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
    }

    @Test
    public void testCreateClusterLinkPolicyAddAndDelete() {
        TestCreateClusterLinkPolicy policy = (TestCreateClusterLinkPolicy)Mockito.mock(TestCreateClusterLinkPolicy.class);
        Map<String, String> configs = VALID_CONFIG;
        ClusterLinkConfig.LinkMode linkMode = ClusterLinkControlManager.getLinkMode(configs);
        String linkModeStr = linkMode.toString();
        Optional<String> tenant = Optional.of("lkc-123");
        String clusterId = Uuid.randomUuid().toString();
        String linkName1 = "link1";
        CreateClusterLinksRequestData.EntryData link1 = ClusterLinkControlManagerTest.clusterLink(linkName1, clusterId, configs, tenant);
        ((TestCreateClusterLinkPolicy)Mockito.doNothing().doThrow(PolicyViolationException.class).when((Object)policy)).validate(tenant, linkModeStr, configs);
        ((TestCreateClusterLinkPolicy)Mockito.doNothing().when((Object)policy)).linkAdded((Uuid)ArgumentMatchers.any(), (Optional)ArgumentMatchers.eq(tenant), (String)ArgumentMatchers.eq((Object)linkMode.toString()));
        ((TestCreateClusterLinkPolicy)Mockito.doNothing().when((Object)policy)).linkDeleted((Uuid)ArgumentMatchers.any());
        TestContext ctx = new TestContext(Optional.of(policy));
        ClusterLinkControlManagerTest.createClusterLink(link1, ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId(linkName1).isPresent());
        String linkName2 = "link2";
        CreateClusterLinksRequestData.EntryData link2 = ClusterLinkControlManagerTest.clusterLink(linkName2, clusterId, configs, tenant);
        ClusterLinkControlManagerTest.createClusterLink(link2, ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)Errors.POLICY_VIOLATION, (Object)apiError.error()));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId(linkName2).isPresent());
        ClusterLinkControlManagerTest.deleteClusterLink(linkName1, ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId(linkName1).isPresent());
        ((TestCreateClusterLinkPolicy)Mockito.verify((Object)policy, (VerificationMode)Mockito.times((int)2))).validate(tenant, linkModeStr, configs);
        ((TestCreateClusterLinkPolicy)Mockito.verify((Object)policy, (VerificationMode)Mockito.times((int)1))).linkAdded((Uuid)ArgumentMatchers.any(), (Optional)ArgumentMatchers.eq(tenant), (String)ArgumentMatchers.eq((Object)linkMode.toString()));
        ((TestCreateClusterLinkPolicy)Mockito.verify((Object)policy, (VerificationMode)Mockito.times((int)1))).linkDeleted((Uuid)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{policy});
    }

    @Test
    public void testCreateClusterLinkPolicyAddAndDeleteOnReplayOnly() {
        TestCreateClusterLinkPolicy policy = (TestCreateClusterLinkPolicy)Mockito.mock(TestCreateClusterLinkPolicy.class);
        String linkMode = ClusterLinkConfig.LinkMode.DESTINATION.name();
        Optional<String> tenant = Optional.of("lkc-123");
        String linkName = "link";
        ClusterLinkRecord clusterLinkRecord = new ClusterLinkRecord();
        Uuid linkId = Uuid.randomUuid();
        clusterLinkRecord.setClusterLinkId(linkId);
        clusterLinkRecord.setClusterLinkName(linkName);
        clusterLinkRecord.setLinkMode(linkMode);
        clusterLinkRecord.setTenantPrefix(tenant.get());
        ((TestCreateClusterLinkPolicy)Mockito.doNothing().when((Object)policy)).linkAdded((Uuid)ArgumentMatchers.any(), (Optional)ArgumentMatchers.eq(tenant), (String)ArgumentMatchers.eq((Object)linkMode));
        ((TestCreateClusterLinkPolicy)Mockito.doNothing().when((Object)policy)).linkDeleted((Uuid)ArgumentMatchers.any());
        TestContext ctx = new TestContext(Optional.of(policy));
        ctx.clusterLinkControl.replay(clusterLinkRecord);
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId(linkName).isPresent());
        RemoveClusterLinkRecord removeClusterLinkRecord = new RemoveClusterLinkRecord();
        removeClusterLinkRecord.setClusterLinkId(linkId);
        ctx.clusterLinkControl.replay(removeClusterLinkRecord);
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId(linkName).isPresent());
        ((TestCreateClusterLinkPolicy)Mockito.verify((Object)policy, (VerificationMode)Mockito.times((int)1))).linkAdded((Uuid)ArgumentMatchers.any(), (Optional)ArgumentMatchers.eq(tenant), (String)ArgumentMatchers.eq((Object)linkMode));
        ((TestCreateClusterLinkPolicy)Mockito.verify((Object)policy, (VerificationMode)Mockito.times((int)1))).linkDeleted((Uuid)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{policy});
    }

    @Test
    public void testInvalidLinkName() {
        TestContext ctx = new TestContext();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("b@dl!nk", Uuid.randomUuid().toString(), Collections.emptyMap()), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.INVALID_CLUSTER_LINK));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("b@dl!nk").isPresent());
    }

    @Test
    public void testInvalidDestinationClusterLinkWithLinkIdInRequest() {
        TestContext ctx = new TestContext();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid(), Uuid.randomUuid().toString(), VALID_CONFIG), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)apiError.error(), (String)apiError.message()));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("testLink").isPresent());
    }

    @Test
    public void testInvalidSourceClusterLinkWithoutLinkIdInRequest() {
        TestContext ctx = new TestContext();
        HashMap<String, String> configMap = new HashMap<String, String>(VALID_CONFIG);
        configMap.put("link.mode", "SOURCE");
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid().toString(), configMap), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)apiError.error(), (String)apiError.message()));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("testLink").isPresent());
    }

    @Test
    public void testCreateSourceClusterLink() {
        TestContext ctx = new TestContext();
        HashMap<String, String> configMap = new HashMap<String, String>(VALID_CONFIG);
        configMap.put("link.mode", "SOURCE");
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid(), Uuid.randomUuid().toString(), configMap), ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId("testLink").isPresent());
    }

    @Test
    public void testCreateSourceClusterLinkWithIncompatibleVersion() {
        TestContext ctx = new TestContext();
        HashMap<String, String> configMap = new HashMap<String, String>(VALID_CONFIG);
        configMap.put("link.mode", "SOURCE");
        FeatureControlManager featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(ctx.snapshotRegistry).setLogContext(ctx.logContext).setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).build();
        ClusterLinkControlManager clusterLinkControl = new ClusterLinkControlManager(ctx.snapshotRegistry, ctx.logContext, ctx.configControl, ctx.mirrorTopicControl, featureControl, topicId -> {}, arg_0 -> ((AclControlManager)ctx.aclControlManager).unlinkAcls(arg_0), ctx.localClusterId, ctx.policy);
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid(), Uuid.randomUuid().toString(), configMap), clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.UNSUPPORTED_VERSION));
    }

    @Test
    public void testCreateClusterLinkWithInvalidLinkMode() {
        TestContext ctx = new TestContext();
        HashMap<String, String> configMap = new HashMap<String, String>(VALID_CONFIG);
        configMap.put("link.mode", "RandomBadMode");
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid().toString(), configMap), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)Errors.INVALID_CONFIG, (Object)apiError.error(), (String)apiError.message()));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("testLink").isPresent());
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("testLink", Uuid.randomUuid(), Uuid.randomUuid().toString(), configMap), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)Errors.INVALID_CONFIG, (Object)apiError.error(), (String)apiError.message()));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("testLink").isPresent());
    }

    @Test
    public void testCreateClusterLinkToSelf() {
        TestContext ctx = new TestContext();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("selfLink", ctx.localClusterId, Collections.emptyMap()), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.INVALID_REQUEST));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("selfLink").isPresent());
    }

    @Test
    public void testCreateExistingLink() {
        TestContext ctx = new TestContext();
        ConfigurationControlManager configControl = new ConfigurationControlManager.Builder().setSnapshotRegistry(ctx.snapshotRegistry).setExistenceChecker((Consumer)ConfigurationControlManagerTest.TestExistenceChecker.INSTANCE).setEncryptionControlManager(ctx.encryptionControlManager).setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).build();
        ClusterLinkControlManager clusterLinkControl = new ClusterLinkControlManager(ctx.snapshotRegistry, ctx.logContext, configControl, ctx.mirrorTopicControl, ctx.featureControl, topicId -> {}, arg_0 -> ((AclControlManager)ctx.aclControlManager).unlinkAcls(arg_0), ctx.localClusterId, ctx.policy);
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), Collections.emptyMap()), clusterLinkControl, configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertTrue((boolean)clusterLinkControl.getClusterLinkId("test-link").isPresent());
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), Collections.emptyMap()), clusterLinkControl, configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.CLUSTER_LINK_EXISTS));
    }

    @Test
    public void testDeleteMissingLink() {
        TestContext ctx = new TestContext();
        ConfigurationControlManager configControl = new ConfigurationControlManager.Builder().setSnapshotRegistry(ctx.snapshotRegistry).setExistenceChecker((Consumer)ConfigurationControlManagerTest.TestExistenceChecker.INSTANCE).setEncryptionControlManager(ctx.encryptionControlManager).setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).build();
        ClusterLinkControlManager clusterLinkControl = new ClusterLinkControlManager(ctx.snapshotRegistry, ctx.logContext, configControl, ctx.mirrorTopicControl, ctx.featureControl, topicId -> {}, arg_0 -> ((AclControlManager)ctx.aclControlManager).unlinkAcls(arg_0), ctx.localClusterId, ctx.policy);
        ClusterLinkControlManagerTest.deleteClusterLink("missing-link", clusterLinkControl, configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.CLUSTER_LINK_NOT_FOUND));
        Assertions.assertFalse((boolean)clusterLinkControl.getClusterLinkId("missing-link").isPresent());
    }

    @Test
    public void testFailConfigValidation() {
        ConfigurationValidator configValidator = new ConfigurationValidator(){

            public void validate(ConfigResource resource) {
            }

            public void validate(ConfigResource resource, Map<String, String> newConfigs, Map<String, String> existingConfigs) {
                if (newConfigs.containsKey("bad")) {
                    throw new InvalidConfigurationException("Bad config");
                }
            }
        };
        TestContext ctx = new TestContext(configValidator);
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), Collections.singletonMap("bad", "config")), ctx.clusterLinkControl, ctx.configControl, apiError -> Assertions.assertEquals((Object)apiError.error(), (Object)Errors.INVALID_CONFIG, (String)"Expected an error when creating a cluster link with failed config validation"));
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent(), (String)"Did not expect to see a cluster link here since it had invalid config");
    }

    @Test
    public void testDeleteLinkWithActiveMirrors() {
        TestContext ctx = new TestContext();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), VALID_CONFIG), ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ApiError error = ctx.mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("foo").setLinkName("test-link").setMirrorTopic("foo-origin").setSourceTopicId(Uuid.randomUuid()), TOPIC_ID, records::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        records.clear();
        error = ctx.clusterLinkControl.deleteClusterLink("test-link", records::add, false);
        Assertions.assertEquals((Object)Errors.CLUSTER_LINK_IN_USE, (Object)error.error(), (String)"Can't delete with force=false if a mirror topic is using the cluster link");
        Assertions.assertTrue((boolean)ctx.mirrorTopicControl.isMirrorTopic(TOPIC_ID));
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
        error = ctx.clusterLinkControl.deleteClusterLink("test-link", records::add, true);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.isMirrorTopic(TOPIC_ID), (String)"Topic should no longer be mirrored after cluster link has been removed");
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
    }

    @Test
    public void testDeleteLinkWithStoppedMirrors() {
        TestContext ctx = new TestContext();
        Uuid linkId = Uuid.randomUuid();
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", linkId.toString(), VALID_CONFIG), ctx.clusterLinkControl, ctx.configControl, ClusterLinkControlManagerTest.success());
        Assertions.assertTrue((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ApiError error = ctx.mirrorTopicControl.maybeAddMirrorTopicRecord(new CreateTopicsRequestData.CreatableTopic().setName("foo").setLinkName("test-link").setMirrorTopic("foo-origin").setSourceTopicId(Uuid.randomUuid()), TOPIC_ID, records::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        records.clear();
        error = ctx.mirrorTopicControl.alterMirrorState(new AlterMirrorTopicsRequestData.AlterMirrorTopic().setTopic("foo").setMirrorTopicState("PendingStoppedMirror"), records::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error(), (String)error.message());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        records.clear();
        error = ctx.mirrorTopicControl.alterMirrorState(new AlterMirrorTopicsRequestData.AlterMirrorTopic().setTopic("foo").setMirrorTopicState("StoppedMirror"), records::add);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error(), (String)error.message());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        records.clear();
        error = ctx.clusterLinkControl.deleteClusterLink("test-link", records::add, false);
        Assertions.assertEquals((Object)Errors.NONE, (Object)error.error());
        Assertions.assertEquals((int)1, (int)records.size());
        ctx.replay(records);
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.isMirrorTopic(TOPIC_ID), (String)"Topic should no longer be mirrored after cluster link has been removed");
        Assertions.assertFalse((boolean)ctx.clusterLinkControl.getClusterLinkId("test-link").isPresent());
        Assertions.assertTrue((boolean)ctx.mirrorTopicControl.topicIdsForClusterLinkId(linkId, false).isEmpty());
    }

    @Test
    public void testCreateClusterLinkRejectsRequestIfTenantPrefixIsNullAndCreateClusterLinkPolicyExists() {
        TestCreateClusterLinkPolicy policy = (TestCreateClusterLinkPolicy)Mockito.mock(TestCreateClusterLinkPolicy.class);
        TestContext ctx = new TestContext(Optional.of(policy));
        ClusterLinkControlManagerTest.createClusterLink(ClusterLinkControlManagerTest.clusterLink("test-link", Uuid.randomUuid().toString(), VALID_CONFIG), ctx.clusterLinkControl, ctx.configControl, apiError -> {
            Assertions.assertEquals((Object)Errors.INVALID_REQUEST, (Object)apiError.error(), (String)"Did not get the expected error type.");
            Assertions.assertEquals((Object)"Tenant prefix cannot be null if a CreateClusterLinkPolicy is set.", (Object)apiError.message(), (String)"Did not get the expected error message.");
        });
    }

    static Consumer<ApiError> success() {
        return apiError -> Assertions.assertTrue((boolean)apiError.isSuccess(), (String)apiError.message());
    }

    static void createClusterLink(CreateClusterLinksRequestData.EntryData clusterLink, ClusterLinkControlManager clusterLinkControl, ConfigurationControlManager configControl, Consumer<ApiError> resultConsumer) {
        ResultOrError result;
        ArrayList records = new ArrayList();
        try {
            result = clusterLinkControl.createClusterLink(clusterLink, records::add, null);
        }
        catch (Throwable e) {
            result = ResultOrError.of((ApiError)ApiError.fromThrowable((Throwable)e));
        }
        if (result.isResult()) {
            resultConsumer.accept(ApiError.NONE);
        } else {
            resultConsumer.accept(result.error());
        }
        records.forEach(record -> {
            if (record.message() instanceof ConfigRecord) {
                configControl.replay((ConfigRecord)record.message());
            } else if (record.message() instanceof ClusterLinkRecord) {
                clusterLinkControl.replay((ClusterLinkRecord)record.message());
            } else {
                Assertions.fail((String)("Unexpected record type " + record.message().getClass()));
            }
        });
    }

    static void deleteClusterLink(String linkName, ClusterLinkControlManager clusterLinkControl, ConfigurationControlManager configControl, Consumer<ApiError> resultConsumer) {
        ArrayList records = new ArrayList();
        ApiError result = clusterLinkControl.deleteClusterLink(linkName, records::add, false);
        resultConsumer.accept(result);
        records.forEach(record -> {
            if (record.message() instanceof ConfigRecord) {
                configControl.replay((ConfigRecord)record.message());
            } else if (record.message() instanceof RemoveClusterLinkRecord) {
                clusterLinkControl.replay((RemoveClusterLinkRecord)record.message());
            } else {
                Assertions.fail((String)("Unexpected record type " + record.message().getClass()));
            }
        });
    }

    static CreateClusterLinksRequestData.EntryData clusterLink(String linkName, String clusterId, Map<String, String> configs) {
        return ClusterLinkControlManagerTest.clusterLink(linkName, Uuid.ZERO_UUID, clusterId, configs, Optional.empty());
    }

    static CreateClusterLinksRequestData.EntryData clusterLink(String linkName, String clusterId, Map<String, String> configs, Optional<String> tenant) {
        return ClusterLinkControlManagerTest.clusterLink(linkName, Uuid.ZERO_UUID, clusterId, configs, tenant);
    }

    static CreateClusterLinksRequestData.EntryData clusterLink(String linkName, Uuid linkId, String clusterId, Map<String, String> configs) {
        return ClusterLinkControlManagerTest.clusterLink(linkName, linkId, clusterId, configs, Optional.empty());
    }

    static CreateClusterLinksRequestData.EntryData clusterLink(String linkName, Uuid linkId, String clusterId, Map<String, String> configs, Optional<String> tenant) {
        return new CreateClusterLinksRequestData.EntryData().setLinkName(linkName).setLinkId(linkId).setClusterId(clusterId).setTenantPrefix((String)tenant.orElse(null)).setConfigs(configs.entrySet().stream().map(entry -> new CreateClusterLinksRequestData.ConfigData().setKey((String)entry.getKey()).setValue((String)entry.getValue())).collect(Collectors.toList()));
    }

    static {
        VALID_CONFIG.put("bootstrap.servers", "localhost:9092");
    }

    class TestCreateClusterLinkPolicy
    implements CreateClusterLinkPolicy,
    ClusterLinkInterceptor {
        TestCreateClusterLinkPolicy() {
        }

        public void configure(Map<String, ?> configs) {
        }

        public void linkAdded(Uuid linkId, Optional<String> tenantPrefix, String linkMode) {
        }

        public void linkDeleted(Uuid linkId) {
        }

        public void validate(Optional<String> tenantPrefix, String linkMode, Map<String, String> configs) throws PolicyViolationException {
        }

        public Set<Uuid> links() {
            return Collections.emptySet();
        }
    }

    static class TestContext {
        LogContext logContext = new LogContext();
        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
        AclControlManager aclControlManager = new AclControlManager.Builder().setLogContext(this.logContext).setSnapshotRegistry(this.snapshotRegistry).setValidLinkIdChecker(this::isValidClusterLink).build();
        ConfigurationControlManager configControl;
        MirrorTopicControlManager mirrorTopicControl;
        ClusterLinkControlManager clusterLinkControl;
        FeatureControlManager featureControl;
        String localClusterId = "clusterId";
        Optional<CreateClusterLinkPolicy> policy;
        EncryptionControlManager encryptionControlManager = new EncryptionControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).build();

        TestContext() {
            this(ConfigurationValidator.NO_OP);
        }

        TestContext(ConfigurationValidator validator) {
            this(validator, Optional.empty());
        }

        TestContext(Optional<CreateClusterLinkPolicy> policy) {
            this(ConfigurationValidator.NO_OP, policy);
        }

        TestContext(ConfigurationValidator validator, Optional<CreateClusterLinkPolicy> policy) {
            this.configControl = new ConfigurationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setExistenceChecker((Consumer)ConfigurationControlManagerTest.TestExistenceChecker.INSTANCE).setValidator(validator).setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).build();
            this.featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(this.logContext).setMetadataVersion(MetadataVersion.IBP_3_3_IV0).build();
            this.mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.of(TOPIC_ID), this::findClusterLinkName, this::findClusterLink, __ -> Optional.empty(), this.featureControl);
            this.clusterLinkControl = new ClusterLinkControlManager(this.snapshotRegistry, this.logContext, this.configControl, this.mirrorTopicControl, this.featureControl, topicId -> {}, arg_0 -> ((AclControlManager)this.aclControlManager).unlinkAcls(arg_0), this.localClusterId, policy);
            this.policy = policy;
        }

        private Optional<Uuid> findClusterLinkName(String linkName) {
            return this.clusterLinkControl.getClusterLinkId(linkName);
        }

        private Optional<ClusterLink> findClusterLink(Uuid linkId) {
            return this.clusterLinkControl.getClusterLink(linkId.toString());
        }

        private Boolean isValidClusterLink(Uuid linkId) {
            return this.clusterLinkControl.isValidLinkId(linkId);
        }

        void replay(List<ApiMessageAndVersion> records) {
            RecordTestUtils.replayAll(this.configControl, records);
            RecordTestUtils.replayAll(this.mirrorTopicControl, records);
            RecordTestUtils.replayAll(this.clusterLinkControl, records);
        }
    }
}

