package kafka.server;

import io.confluent.conflux.concurrent.SemiCompletionStage;
import io.confluent.k2.kafka.K2ControllerHandler;
import io.confluent.k2.kafka.K2TopicMetadata;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.telemetry.api.events.NoOpEventEmitter;
import io.opentelemetry.context.Context;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import kafka.network.RequestChannel;
import kafka.server.QuotaFactory;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestLogFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.NoOpAuditLogProvider;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.immutable.ImmutableSet;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Option;

/* loaded from: input_file:kafka/server/K2ControllerApisTest.class */
public class K2ControllerApisTest {
    private Time time;
    private QuotaFactory.QuotaManagers quotas;
    private KafkaConfig kafkaConfig;
    private final RequestChannel requestChannel = (RequestChannel) Mockito.mock(RequestChannel.class);
    private final ClientQuotaManager clientQuotaManager = (ClientQuotaManager) Mockito.mock(ClientQuotaManager.class);
    private final ClientRequestQuotaManager clientRequestQuotaManager = (ClientRequestQuotaManager) Mockito.mock(ClientRequestQuotaManager.class);
    private final ControllerMutationQuotaManager clientControllerQuotaManager = (ControllerMutationQuotaManager) Mockito.mock(ControllerMutationQuotaManager.class);
    private final ProducerIdQuotaManager clientProducerIdQuotaManager = (ProducerIdQuotaManager) Mockito.mock(ProducerIdQuotaManager.class);
    private final ReplicationQuotaManager replicaQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
    private final ClusterLinkReplicationQuotaManager clusterLinkReplicationQuotaManager = (ClusterLinkReplicationQuotaManager) Mockito.mock(ClusterLinkReplicationQuotaManager.class);
    private final ClusterLinkRequestQuotaManager clusterLinkRequestQuotaManager = (ClusterLinkRequestQuotaManager) Mockito.mock(ClusterLinkRequestQuotaManager.class);
    private final ClientQuotaCallback clientQuotaCallback = (ClientQuotaCallback) Mockito.mock(ClientQuotaCallback.class);
    private ExecutorService executor = null;
    private final KafkaPrincipalSerde kafkaPrincipalSerde = new KafkaPrincipalSerde() { // from class: kafka.server.K2ControllerApisTest.1
        public byte[] serialize(KafkaPrincipal kafkaPrincipal) throws SerializationException {
            return Utils.utf8(kafkaPrincipal.toString());
        }

        public KafkaPrincipal deserialize(byte[] bArr) throws SerializationException {
            return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bArr));
        }
    };

    @BeforeEach
    public void setUp() {
        Mockito.reset(new Object[]{this.requestChannel, this.clientQuotaManager, this.clientRequestQuotaManager, this.clientControllerQuotaManager, this.clientProducerIdQuotaManager, this.replicaQuotaManager, this.clusterLinkReplicationQuotaManager, this.clusterLinkRequestQuotaManager, this.clientQuotaCallback});
        this.time = new MockTime();
        this.quotas = new QuotaFactory.QuotaManagers(this.clientQuotaManager, this.clientQuotaManager, this.clientRequestQuotaManager, this.clientControllerQuotaManager, Option.apply(this.clientProducerIdQuotaManager), this.replicaQuotaManager, this.replicaQuotaManager, this.replicaQuotaManager, this.clusterLinkReplicationQuotaManager, this.clusterLinkRequestQuotaManager, Option.apply(this.clientQuotaCallback));
        this.kafkaConfig = new KafkaConfig((Map) TestUtils.createBrokerConfigsForJava(1, "").apply(0));
        this.executor = Executors.newSingleThreadExecutor();
    }

    @AfterEach
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleCreateTopicsRequest(boolean z) {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", 1);
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> createTopicsCaptor = getCreateTopicsCaptor(k2ControllerHandler, createTopicsRequest.data().topics().duplicate());
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, z ? allowAllAuthorizer() : noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = getCreatableTopicResult(createTopicsCaptor, "foo", 1);
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        Assertions.assertTrue(creatableTopicResultCollection.contains(creatableTopicResult));
    }

    @Test
    public void testHandleCreateTopicsRequestUnauthorizedForDescribeConfigs() {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", 1);
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> createTopicsCaptor = getCreateTopicsCaptor(k2ControllerHandler, createTopicsRequest.data().topics().duplicate(), false);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, denyDescribeConfigsAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = getCreatableTopicResult(createTopicsCaptor, "foo", 1);
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        Assertions.assertTrue(creatableTopicResultCollection.contains(creatableTopicResult));
    }

    @Test
    public void testHandleCreateTopicsRequestUsingDefaultPartitionCount() {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", -1);
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        Assertions.assertTrue(this.kafkaConfig.numPartitions().intValue() > 0);
        CreateTopicsRequestData.CreatableTopicCollection duplicate = createTopicsRequest.data().topics().duplicate();
        ((CreateTopicsRequestData.CreatableTopic) duplicate.iterator().next()).setNumPartitions(this.kafkaConfig.numPartitions().intValue());
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> createTopicsCaptor = getCreateTopicsCaptor(k2ControllerHandler, duplicate);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = getCreatableTopicResult(createTopicsCaptor, "foo", this.kafkaConfig.numPartitions().intValue());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        Assertions.assertTrue(creatableTopicResultCollection.contains(creatableTopicResult));
    }

    @Test
    public void testHandleCreateTopicsRequestWithValidateOnly() {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", -1);
        createTopicsRequest.data().setValidateOnly(true);
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        Assertions.assertTrue(this.kafkaConfig.numPartitions().intValue() > 0);
        CreateTopicsRequestData.CreatableTopicCollection duplicate = createTopicsRequest.data().topics().duplicate();
        ((CreateTopicsRequestData.CreatableTopic) duplicate.iterator().next()).setNumPartitions(this.kafkaConfig.numPartitions().intValue());
        getCreateTopicsCaptor(k2ControllerHandler, duplicate);
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.eq(Collections.singleton("foo")), ArgumentMatchers.eq(true));
    }

    @Test
    public void testHandleCreateTopicsRequestUnauthorized() {
        RequestChannel.Request request = getRequest("clientId", 0, getCreateTopicsRequest("foo", 1));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, denyAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) creatableTopicResultCollection.iterator().next();
        Assertions.assertEquals("foo", creatableTopicResult.name());
        Assertions.assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), creatableTopicResult.errorCode());
        Assertions.assertEquals("Authorization failed.", creatableTopicResult.errorMessage());
    }

    @Test
    public void testHandleCreateTopicsRequestWithDupes() {
        RequestChannel.Request request = getRequest("clientId", 0, getCreateTopicsRequest("foo", 1, "foo", 1));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(2, creatableTopicResultCollection.size());
        Iterator it = creatableTopicResultCollection.iterator();
        while (it.hasNext()) {
            CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) it.next();
            Assertions.assertEquals("foo", creatableTopicResult.name());
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), creatableTopicResult.errorCode());
            Assertions.assertEquals("Found multiple entries for this topic.", creatableTopicResult.errorMessage());
        }
    }

    @Test
    public void testHandleCreateTopicsRequestWithKRaftMetadataLogTopicName() {
        RequestChannel.Request request = getRequest("clientId", 0, getCreateTopicsRequest("__cluster_metadata", 1, "__cluster_metadata", 1));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(2, creatableTopicResultCollection.size());
        Iterator it = creatableTopicResultCollection.iterator();
        while (it.hasNext()) {
            CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) it.next();
            Assertions.assertEquals("__cluster_metadata", creatableTopicResult.name());
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), creatableTopicResult.errorCode());
            Assertions.assertEquals("Cannot create a topic with name __cluster_metadata.", creatableTopicResult.errorMessage());
        }
    }

    @Test
    public void testHandleCreateTopicsRequestWithMirrorTopic() {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", 1);
        ((CreateTopicsRequestData.CreatableTopic) createTopicsRequest.data().topics().iterator().next()).setLinkName("nonEmptyLinkName");
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) creatableTopicResultCollection.iterator().next();
        Assertions.assertEquals("foo", creatableTopicResult.name());
        Assertions.assertEquals(Errors.INVALID_REQUEST.code(), creatableTopicResult.errorCode());
        Assertions.assertEquals("Mirror topics are not supported.", creatableTopicResult.errorMessage());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleCreateTopicsRequestWithManualAssignmentsNonHealthcheckTenant(boolean z) {
        MultiTenantPrincipal kafkaPrincipal;
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", 1);
        CreateTopicsRequestData.CreatableReplicaAssignmentCollection creatableReplicaAssignmentCollection = new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
        creatableReplicaAssignmentCollection.add(new CreateTopicsRequestData.CreatableReplicaAssignment());
        ((CreateTopicsRequestData.CreatableTopic) createTopicsRequest.data().topics().iterator().next()).setAssignments(creatableReplicaAssignmentCollection);
        if (z) {
            MultiTenantPrincipal multiTenantPrincipal = (MultiTenantPrincipal) Mockito.mock(MultiTenantPrincipal.class);
            Mockito.when(multiTenantPrincipal.tenantMetadata()).thenReturn(new TenantMetadata.Builder("clusterId", "userResourceId").healthcheckTenant(false).build());
            Mockito.when(multiTenantPrincipal.getName()).thenReturn("principalName");
            kafkaPrincipal = multiTenantPrincipal;
        } else {
            kafkaPrincipal = new KafkaPrincipal("User", "Alice");
        }
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest, kafkaPrincipal);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) creatableTopicResultCollection.iterator().next();
        Assertions.assertEquals("foo", creatableTopicResult.name());
        Assertions.assertEquals(Errors.INVALID_REQUEST.code(), creatableTopicResult.errorCode());
        Assertions.assertEquals("Manual partition assignments are not supported.", creatableTopicResult.errorMessage());
    }

    @Test
    public void testHandleCreateTopicsRequestWithManualAssignmentsHealthcheckTenant() {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", -1);
        CreateTopicsRequestData.CreatableReplicaAssignmentCollection creatableReplicaAssignmentCollection = new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
        creatableReplicaAssignmentCollection.add(new CreateTopicsRequestData.CreatableReplicaAssignment());
        ((CreateTopicsRequestData.CreatableTopic) createTopicsRequest.data().topics().iterator().next()).setAssignments(creatableReplicaAssignmentCollection);
        MultiTenantPrincipal multiTenantPrincipal = (MultiTenantPrincipal) Mockito.mock(MultiTenantPrincipal.class);
        Mockito.when(multiTenantPrincipal.tenantMetadata()).thenReturn(new TenantMetadata.Builder("clusterId", "userResourceId").healthcheckTenant(true).build());
        Mockito.when(multiTenantPrincipal.getName()).thenReturn("principalName");
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest, multiTenantPrincipal);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        Assertions.assertTrue(this.kafkaConfig.numPartitions().intValue() > 0);
        CreateTopicsRequestData.CreatableTopicCollection duplicate = createTopicsRequest.data().topics().duplicate();
        ((CreateTopicsRequestData.CreatableTopic) duplicate.iterator().next()).setNumPartitions(this.kafkaConfig.numPartitions().intValue());
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> createTopicsCaptor = getCreateTopicsCaptor(k2ControllerHandler, duplicate);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = getCreatableTopicResult(createTopicsCaptor, "foo", this.kafkaConfig.numPartitions().intValue());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        Assertions.assertTrue(creatableTopicResultCollection.contains(creatableTopicResult));
    }

    @ValueSource(shorts = {-2, 0, 2})
    @ParameterizedTest
    public void testHandleCreateTopicsRequestWithBadReplicationFactor(short s) {
        RequestChannel.Request request = getRequest("clientId", 0, getCreateTopicsRequest("foo", 1, s, (Map<String, String>) Collections.emptyMap()));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = (CreateTopicsResponseData.CreatableTopicResult) creatableTopicResultCollection.iterator().next();
        Assertions.assertEquals("foo", creatableTopicResult.name());
        Assertions.assertEquals(Errors.INVALID_REQUEST.code(), creatableTopicResult.errorCode());
        Assertions.assertEquals("Explicit replication factors that do not match the default value of 1 are not supported.", creatableTopicResult.errorMessage());
    }

    @ValueSource(shorts = {-1, 1})
    @ParameterizedTest
    public void testHandleCreateTopicsRequestWithGoodReplicationFactor(short s) {
        CreateTopicsRequest createTopicsRequest = getCreateTopicsRequest("foo", -1, s, (Map<String, String>) Collections.emptyMap());
        RequestChannel.Request request = getRequest("clientId", 0, createTopicsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        Assertions.assertTrue(this.kafkaConfig.numPartitions().intValue() > 0);
        CreateTopicsRequestData.CreatableTopicCollection duplicate = createTopicsRequest.data().topics().duplicate();
        ((CreateTopicsRequestData.CreatableTopic) duplicate.iterator().next()).setNumPartitions(this.kafkaConfig.numPartitions().intValue());
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> createTopicsCaptor = getCreateTopicsCaptor(k2ControllerHandler, duplicate);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreateTopicsRequest(request);
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = getCreatableTopicResult(createTopicsCaptor, "foo", this.kafkaConfig.numPartitions().intValue());
        verifyExpectedThrottling(0);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = getCreateTopicsResponse(abstractResponseArgumentCaptor).data().topics();
        Assertions.assertEquals(1, creatableTopicResultCollection.size());
        Assertions.assertTrue(creatableTopicResultCollection.contains(creatableTopicResult));
    }

    @Test
    public void testHandleDeleteTopicsRequestWithTopicDeletionDisabled() {
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest("foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        Properties properties = (Properties) TestUtils.createBrokerConfigsForJava(1, "").apply(0);
        properties.setProperty("delete.topic.enable", "false");
        this.kafkaConfig = new KafkaConfig(properties);
        K2ControllerApis k2ControllerApis = new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor);
        Assertions.assertThrows(TopicDeletionDisabledException.class, () -> {
            k2ControllerApis.handleDeleteTopicsRequest(request);
        });
    }

    @Test
    public void testHandleDeleteTopicsRequestWithDuplicateNames() {
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest("foo", "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Iterator it = responses.iterator();
        while (it.hasNext()) {
            DeleteTopicsResponseData.DeletableTopicResult deletableTopicResult = (DeleteTopicsResponseData.DeletableTopicResult) it.next();
            Assertions.assertEquals("foo", deletableTopicResult.name());
            Assertions.assertEquals(Uuid.ZERO_UUID, deletableTopicResult.topicId());
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), deletableTopicResult.errorCode());
            Assertions.assertEquals("Duplicate topic name.", deletableTopicResult.errorMessage());
        }
    }

    @Test
    public void testHandleDeleteTopicsRequestWithDuplicateTopicIds() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid, randomUuid));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Iterator it = responses.iterator();
        while (it.hasNext()) {
            DeleteTopicsResponseData.DeletableTopicResult deletableTopicResult = (DeleteTopicsResponseData.DeletableTopicResult) it.next();
            Assertions.assertNull(deletableTopicResult.name());
            Assertions.assertEquals(randomUuid, deletableTopicResult.topicId());
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), deletableTopicResult.errorCode());
            Assertions.assertEquals("Duplicate topic id.", deletableTopicResult.errorMessage());
        }
    }

    @Test
    public void testHandleDeleteTopicsRequestWithDuplicateTopicIdsAndNames() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(Uuid.ZERO_UUID, null, randomUuid, null, Uuid.ZERO_UUID, "foo", randomUuid, "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(4, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName((String) null).setTopicId(Uuid.ZERO_UUID).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Neither topic name nor id were specified.")));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(randomUuid).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("You may not specify both topic name and topic id.")));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(Uuid.ZERO_UUID).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Duplicate topic name.")));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName((String) null).setTopicId(randomUuid).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Duplicate topic id.")));
    }

    @Test
    public void testHandleDeleteTopicsRequestUnrecognizedNameAndId() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid, "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.singleton(randomUuid));
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, denyAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(Collections.emptySet(), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(2, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(Uuid.ZERO_UUID).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName((String) null).setTopicId(randomUuid).setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())));
    }

    @Test
    public void testHandleDeleteTopicsRequestUnauthorizedNameAndId() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid, "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.singleton(randomUuid));
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, denyAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(Collections.singleton(new K2TopicMetadata("foo", randomUuid, 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE)), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(2, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(Uuid.ZERO_UUID).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName((String) null).setTopicId(randomUuid).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())));
    }

    @Test
    public void testHandleDeleteTopicsRequestAuthorizedNameAndIdMapToDuplicate() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid, "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.singleton(randomUuid));
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(Collections.singleton(new K2TopicMetadata("foo", randomUuid, 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE)), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(randomUuid).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("The provided topic name maps to an ID that was already supplied.")));
    }

    @ValueSource(ints = {2, 0})
    @ParameterizedTest
    public void testHandleDeleteTopicsExpandedPartitions(int i) {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest("foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.emptySet());
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        Set singleton = Collections.singleton(new K2TopicMetadata("foo", randomUuid, 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE, i));
        ArgumentCaptor<BiConsumer<Map<Uuid, ApiError>, Throwable>> deleteTopicsCaptor = getDeleteTopicsCaptor(k2ControllerHandler, Collections.singletonMap(randomUuid, Integer.valueOf(Math.max(1, i))));
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(singleton, null);
        ((BiConsumer) deleteTopicsCaptor.getValue()).accept(ImmutableMap.singleton(randomUuid, ApiError.NONE), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(randomUuid).setErrorCode(Errors.NONE.code())));
    }

    @Test
    public void testHandleDeleteTopicsRequestAuthorizedName() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest("foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.emptySet());
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        Set singleton = Collections.singleton(new K2TopicMetadata("foo", randomUuid, 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE));
        ArgumentCaptor<BiConsumer<Map<Uuid, ApiError>, Throwable>> deleteTopicsCaptor = getDeleteTopicsCaptor(k2ControllerHandler, Collections.singletonMap(randomUuid, 1));
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(singleton, null);
        ((BiConsumer) deleteTopicsCaptor.getValue()).accept(ImmutableMap.singleton(randomUuid, ApiError.NONE), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(randomUuid).setErrorCode(Errors.NONE.code())));
    }

    @Test
    public void testHandleDeleteTopicsRequestAuthorizedId() {
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.emptySet(), Collections.singleton(randomUuid));
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        Set singleton = Collections.singleton(new K2TopicMetadata("foo", randomUuid, 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE));
        ArgumentCaptor<BiConsumer<Map<Uuid, ApiError>, Throwable>> deleteTopicsCaptor = getDeleteTopicsCaptor(k2ControllerHandler, Collections.singletonMap(randomUuid, 1));
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(singleton, null);
        ((BiConsumer) deleteTopicsCaptor.getValue()).accept(ImmutableMap.singleton(randomUuid, ApiError.NONE), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(1, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(randomUuid).setErrorCode(Errors.NONE.code())));
    }

    @EnumSource
    @ParameterizedTest
    public void testHandleDeleteTopicsRequestNotExistingNameAndId(K2TopicMetadata.K2TopicLifecycleState k2TopicLifecycleState) {
        boolean z = k2TopicLifecycleState == K2TopicMetadata.K2TopicLifecycleState.ONLINE;
        Uuid randomUuid = Uuid.randomUuid();
        RequestChannel.Request request = getRequest("clientId", 0, getDeleteTopicsRequest(randomUuid, "foo"));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, Collections.singleton("foo"), Collections.singleton(randomUuid));
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleDeleteTopicsRequest(request);
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(z ? Collections.emptySet() : ImmutableSet.singleton(new K2TopicMetadata("foo", Uuid.randomUuid(), 1, k2TopicLifecycleState)).added(new K2TopicMetadata("someOtherTopicWithTheGivenTopicId", randomUuid, 1, k2TopicLifecycleState)), null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).deleteTopics((Map) ArgumentMatchers.any());
        verifyExpectedThrottling(0);
        DeleteTopicsResponseData.DeletableTopicResultCollection responses = getDeleteTopicsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(2, responses.size());
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName("foo").setTopicId(Uuid.ZERO_UUID).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
        Assertions.assertTrue(responses.contains(new DeleteTopicsResponseData.DeletableTopicResult().setName((String) null).setTopicId(randomUuid).setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())));
    }

    @ValueSource(strings = {"no", "allow", "deny"})
    @ParameterizedTest
    public void testHandleCreatePartitionsRequestWithAuthorization(String str) {
        int i = 1;
        int i2 = 2;
        CreatePartitionsRequest createPartitionsRequest = getCreatePartitionsRequest(2, 2, false, false);
        ArrayList arrayList = new ArrayList(createPartitionsRequest.data().topics().valuesSet());
        Map map = (Map) arrayList.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toMap(Function.identity(), str2 -> {
            return Arrays.asList(Integer.valueOf(i), Integer.valueOf(i2));
        }));
        RequestChannel.Request request = getRequest("clientId", 0, createPartitionsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, (Set) arrayList.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet()), Collections.emptySet());
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> createPartitionsCaptor = getCreatePartitionsCaptor(k2ControllerHandler, map);
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, getAuthorizerOption(str), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreatePartitionsRequest(request);
        HashSet hashSet = new HashSet();
        hashSet.add(new K2TopicMetadata(((CreatePartitionsRequestData.CreatePartitionsTopic) arrayList.get(0)).name(), Uuid.randomUuid(), 1, K2TopicMetadata.K2TopicLifecycleState.ONLINE));
        hashSet.add(new K2TopicMetadata(((CreatePartitionsRequestData.CreatePartitionsTopic) arrayList.get(1)).name(), Uuid.randomUuid(), 1, K2TopicMetadata.K2TopicLifecycleState.CREATING));
        if (str.equals("deny")) {
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).getTopicMetadata((Set) ArgumentMatchers.any(), (Set) ArgumentMatchers.any());
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createAdditionalPartitions((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        } else {
            ((BiConsumer) topicMetadataCaptor.getValue()).accept(hashSet, null);
            ((BiConsumer) createPartitionsCaptor.getValue()).accept((Map) arrayList.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toMap(Function.identity(), str3 -> {
                return ApiError.NONE;
            })), null);
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).createAdditionalPartitions((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        }
        verifyExpectedThrottling(0);
        List results = getCreatePartitionsResponse(abstractResponseArgumentCaptor).data().results();
        Assertions.assertEquals(2, results.size());
        short code = str.equals("deny") ? Errors.TOPIC_AUTHORIZATION_FAILED.code() : Errors.NONE.code();
        Iterator it = results.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(code, ((CreatePartitionsResponseData.CreatePartitionsTopicResult) it.next()).errorCode());
        }
    }

    @ValueSource(strings = {"duplicateTopics", "hasAssignment"})
    @ParameterizedTest
    public void testHandleCreatePartitionsRequestInvalid(String str) {
        int i = str.equals("duplicateTopics") ? 1 : 2;
        RequestChannel.Request request = getRequest("clientId", 0, getCreatePartitionsRequest(2, 2, str.equals("duplicateTopics"), str.equals("hasAssignment")));
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreatePartitionsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createAdditionalPartitions((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        List results = getCreatePartitionsResponse(abstractResponseArgumentCaptor).data().results();
        Assertions.assertEquals(i, results.size());
        Iterator it = results.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), ((CreatePartitionsResponseData.CreatePartitionsTopicResult) it.next()).errorCode());
        }
    }

    @ValueSource(ints = {2, 1})
    @ParameterizedTest
    public void testHandleCreatePartitionsRequestWithTargetPartitionCountError(int i) {
        CreatePartitionsRequest createPartitionsRequest = getCreatePartitionsRequest(2, i, false, false);
        ArrayList arrayList = new ArrayList(createPartitionsRequest.data().topics().valuesSet());
        RequestChannel.Request request = getRequest("clientId", 0, createPartitionsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> topicMetadataCaptor = getTopicMetadataCaptor(k2ControllerHandler, (Set) arrayList.stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet()), Collections.emptySet());
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, allowAllAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleCreatePartitionsRequest(request);
        HashSet hashSet = new HashSet();
        hashSet.add(new K2TopicMetadata(((CreatePartitionsRequestData.CreatePartitionsTopic) arrayList.get(0)).name(), Uuid.randomUuid(), 2, K2TopicMetadata.K2TopicLifecycleState.ONLINE));
        hashSet.add(new K2TopicMetadata(((CreatePartitionsRequestData.CreatePartitionsTopic) arrayList.get(1)).name(), Uuid.randomUuid(), 2 + 1, K2TopicMetadata.K2TopicLifecycleState.CREATING));
        ((BiConsumer) topicMetadataCaptor.getValue()).accept(hashSet, null);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).createAdditionalPartitions((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        List results = getCreatePartitionsResponse(abstractResponseArgumentCaptor).data().results();
        Assertions.assertEquals(2, results.size());
        Iterator it = results.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(Errors.INVALID_PARTITIONS.code(), ((CreatePartitionsResponseData.CreatePartitionsTopicResult) it.next()).errorCode());
        }
    }

    @ParameterizedTest
    @CsvSource({"no,true", "allow,true", "deny,true", "no,false", "allow,false", "deny,false"})
    public void testHandleIncrementalAlterTopicConfigsRequestWithAuthorization(String str, boolean z) {
        IncrementalAlterConfigsRequest incrementalAlterTopicConfigsRequest = getIncrementalAlterTopicConfigsRequest(2, false, z);
        RequestChannel.Request request = getRequest("clientId", 0, incrementalAlterTopicConfigsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        Map<String, Collection<AlterConfigOp>> configsToAlterByTopic = getConfigsToAlterByTopic(incrementalAlterTopicConfigsRequest);
        ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> alterTopicConfigsCaptor = getAlterTopicConfigsCaptor(k2ControllerHandler, configsToAlterByTopic, true);
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, getAuthorizerOption(str), this.quotas, this.time, k2ControllerHandler, this.executor).handleIncrementalAlterConfigsRequest(request);
        if (str.equals("deny")) {
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).alterTopicConfigs((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        } else {
            ((BiConsumer) alterTopicConfigsCaptor.getValue()).accept((Map) configsToAlterByTopic.keySet().stream().collect(Collectors.toMap(Function.identity(), str2 -> {
                return ApiError.NONE;
            })), null);
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).alterTopicConfigs((Map) ArgumentMatchers.eq(configsToAlterByTopic), ArgumentMatchers.eq(true), ArgumentMatchers.eq(z));
        }
        verifyExpectedThrottling(0);
        List responses = getIncrementalAlterConfigsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(2, responses.size());
        short code = str.equals("deny") ? Errors.TOPIC_AUTHORIZATION_FAILED.code() : Errors.NONE.code();
        Iterator it = responses.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(code, ((IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse) it.next()).errorCode());
        }
    }

    @Test
    public void testHandleIncrementalAlterTopicConfigsRequestWithDuplicateTopics() {
        invokeAndConfirmInvalidRequestErrorReceived(getIncrementalAlterTopicConfigsRequest(2, true, false), "Each resource must appear at most once.");
    }

    @Test
    public void testHandleIncrementalAlterTopicConfigsRequestWithEmptyKey() {
        IncrementalAlterConfigsRequest incrementalAlterTopicConfigsRequest = getIncrementalAlterTopicConfigsRequest(1, false, false);
        ((IncrementalAlterConfigsRequestData.AlterableConfig) ((IncrementalAlterConfigsRequestData.AlterConfigsResource) incrementalAlterTopicConfigsRequest.data().resources().iterator().next()).configs().iterator().next()).setName("");
        invokeAndConfirmInvalidRequestErrorReceived(incrementalAlterTopicConfigsRequest, "Empty or null value not supported for config key.");
    }

    @Test
    public void testHandleIncrementalAlterTopicConfigsRequestWithNullValue() {
        IncrementalAlterConfigsRequest incrementalAlterTopicConfigsRequest = getIncrementalAlterTopicConfigsRequest(1, false, false);
        ((IncrementalAlterConfigsRequestData.AlterableConfig) ((IncrementalAlterConfigsRequestData.AlterConfigsResource) incrementalAlterTopicConfigsRequest.data().resources().iterator().next()).configs().iterator().next()).setValue((String) null);
        invokeAndConfirmInvalidRequestErrorReceived(incrementalAlterTopicConfigsRequest, "Null value not supported for config update.");
    }

    @Test
    public void testHandleIncrementalAlterTopicConfigsRequestWithDuplicateConfigKey() {
        IncrementalAlterConfigsRequest incrementalAlterTopicConfigsRequest = getIncrementalAlterTopicConfigsRequest(1, false, false);
        IncrementalAlterConfigsRequestData.AlterableConfigCollection configs = ((IncrementalAlterConfigsRequestData.AlterConfigsResource) incrementalAlterTopicConfigsRequest.data().resources().iterator().next()).configs();
        IncrementalAlterConfigsRequestData.AlterableConfig alterableConfig = (IncrementalAlterConfigsRequestData.AlterableConfig) configs.iterator().next();
        configs.add(new IncrementalAlterConfigsRequestData.AlterableConfig().setName(alterableConfig.name()).setValue(alterableConfig.value()).setConfigOperation(alterableConfig.configOperation()));
        invokeAndConfirmInvalidRequestErrorReceived(incrementalAlterTopicConfigsRequest, "Error due to duplicate config keys.");
    }

    @ParameterizedTest
    @CsvSource({"no,true", "allow,true", "deny,true", "no,false", "allow,false", "deny,false"})
    public void testHandleLegacyAlterTopicConfigsRequestWithAuthorization(String str, boolean z) {
        AlterConfigsRequest legacyAlterTopicConfigsRequest = getLegacyAlterTopicConfigsRequest(2, false, z);
        RequestChannel.Request request = getRequest("clientId", 0, legacyAlterTopicConfigsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        Map<String, Collection<AlterConfigOp>> configsToAlterByTopic = getConfigsToAlterByTopic(legacyAlterTopicConfigsRequest);
        ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> alterTopicConfigsCaptor = getAlterTopicConfigsCaptor(k2ControllerHandler, configsToAlterByTopic, false);
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, getAuthorizerOption(str), this.quotas, this.time, k2ControllerHandler, this.executor).handleLegacyAlterConfigsRequest(request);
        if (str.equals("deny")) {
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).alterTopicConfigs((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        } else {
            ((BiConsumer) alterTopicConfigsCaptor.getValue()).accept((Map) configsToAlterByTopic.keySet().stream().collect(Collectors.toMap(Function.identity(), str2 -> {
                return ApiError.NONE;
            })), null);
            ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(1))).alterTopicConfigs((Map) ArgumentMatchers.eq(configsToAlterByTopic), ArgumentMatchers.eq(false), ArgumentMatchers.eq(z));
        }
        verifyExpectedThrottling(0);
        List responses = getLegacyAlterConfigsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(2, responses.size());
        short code = str.equals("deny") ? Errors.TOPIC_AUTHORIZATION_FAILED.code() : Errors.NONE.code();
        Iterator it = responses.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(code, ((AlterConfigsResponseData.AlterConfigsResourceResponse) it.next()).errorCode());
        }
    }

    @Test
    public void testHandleLegacyAlterTopicConfigsRequestWithDuplicateTopics() {
        invokeAndConfirmInvalidRequestErrorReceived(getLegacyAlterTopicConfigsRequest(2, true, false), "Each resource must appear at most once.");
    }

    @Test
    public void testHandleLegacyAlterTopicConfigsRequestWithEmptyKey() {
        AlterConfigsRequest legacyAlterTopicConfigsRequest = getLegacyAlterTopicConfigsRequest(1, false, false);
        ((AlterConfigsRequestData.AlterableConfig) ((AlterConfigsRequestData.AlterConfigsResource) legacyAlterTopicConfigsRequest.data().resources().iterator().next()).configs().iterator().next()).setName("");
        invokeAndConfirmInvalidRequestErrorReceived(legacyAlterTopicConfigsRequest, "Empty or null value not supported for config key.");
    }

    @Test
    public void testHandleLegacyAlterTopicConfigsRequestWithNullValue() {
        AlterConfigsRequest legacyAlterTopicConfigsRequest = getLegacyAlterTopicConfigsRequest(1, false, false);
        ((AlterConfigsRequestData.AlterableConfig) ((AlterConfigsRequestData.AlterConfigsResource) legacyAlterTopicConfigsRequest.data().resources().iterator().next()).configs().iterator().next()).setValue((String) null);
        invokeAndConfirmInvalidRequestErrorReceived(legacyAlterTopicConfigsRequest, "Null value not supported for config update.");
    }

    @Test
    public void testHandleLegacyAlterTopicConfigsRequestWithDuplicateConfigKey() {
        AlterConfigsRequest legacyAlterTopicConfigsRequest = getLegacyAlterTopicConfigsRequest(1, false, false);
        AlterConfigsRequestData.AlterableConfigCollection configs = ((AlterConfigsRequestData.AlterConfigsResource) legacyAlterTopicConfigsRequest.data().resources().iterator().next()).configs();
        AlterConfigsRequestData.AlterableConfig alterableConfig = (AlterConfigsRequestData.AlterableConfig) configs.iterator().next();
        configs.add(new AlterConfigsRequestData.AlterableConfig().setName(alterableConfig.name()).setValue(alterableConfig.value()));
        invokeAndConfirmInvalidRequestErrorReceived(legacyAlterTopicConfigsRequest, "Error due to duplicate config keys.");
    }

    private void invokeAndConfirmInvalidRequestErrorReceived(IncrementalAlterConfigsRequest incrementalAlterConfigsRequest, String str) {
        RequestChannel.Request request = getRequest("clientId", 0, incrementalAlterConfigsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleIncrementalAlterConfigsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).alterTopicConfigs((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        List<IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse> responses = getIncrementalAlterConfigsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(incrementalAlterConfigsRequest.data().resources().size(), responses.size());
        for (IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse alterConfigsResourceResponse : responses) {
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), alterConfigsResourceResponse.errorCode());
            Assertions.assertEquals(str, alterConfigsResourceResponse.errorMessage());
        }
    }

    private void invokeAndConfirmInvalidRequestErrorReceived(AlterConfigsRequest alterConfigsRequest, String str) {
        RequestChannel.Request request = getRequest("clientId", 0, alterConfigsRequest);
        K2ControllerHandler k2ControllerHandler = (K2ControllerHandler) Mockito.mock(K2ControllerHandler.class);
        ArgumentCaptor<AbstractResponse> abstractResponseArgumentCaptor = getAbstractResponseArgumentCaptor();
        new K2ControllerApis(this.requestChannel, 0, this.kafkaConfig, noAuthorizer(), this.quotas, this.time, k2ControllerHandler, this.executor).handleLegacyAlterConfigsRequest(request);
        ((K2ControllerHandler) Mockito.verify(k2ControllerHandler, Mockito.times(0))).alterTopicConfigs((Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean());
        verifyExpectedThrottling(0);
        List<AlterConfigsResponseData.AlterConfigsResourceResponse> responses = getLegacyAlterConfigsResponse(abstractResponseArgumentCaptor).data().responses();
        Assertions.assertEquals(alterConfigsRequest.data().resources().size(), responses.size());
        for (AlterConfigsResponseData.AlterConfigsResourceResponse alterConfigsResourceResponse : responses) {
            Assertions.assertEquals(Errors.INVALID_REQUEST.code(), alterConfigsResourceResponse.errorCode());
            Assertions.assertEquals(str, alterConfigsResourceResponse.errorMessage());
        }
    }

    private static Option<Authorizer> getAuthorizerOption(String str) {
        Option<Authorizer> allowAllAuthorizer = allowAllAuthorizer();
        if (str.equals("no")) {
            allowAllAuthorizer = noAuthorizer();
        } else if (str.equals("deny")) {
            allowAllAuthorizer = denyAllAuthorizer();
        }
        return allowAllAuthorizer;
    }

    private static Map<String, Collection<AlterConfigOp>> getConfigsToAlterByTopic(IncrementalAlterConfigsRequest incrementalAlterConfigsRequest) {
        Map map = (Map) incrementalAlterConfigsRequest.data().resources().stream().map((v0) -> {
            return v0.resourceName();
        }).collect(Collectors.groupingBy(UnaryOperator.identity(), Collectors.counting()));
        return (Map) incrementalAlterConfigsRequest.data().resources().stream().filter(alterConfigsResource -> {
            return ((Long) map.get(alterConfigsResource.resourceName())).longValue() == 1;
        }).map(alterConfigsResource2 -> {
            return new AbstractMap.SimpleEntry(alterConfigsResource2.resourceName(), (Collection) alterConfigsResource2.configs().stream().map(alterableConfig -> {
                return new AlterConfigOp(new ConfigEntry(alterableConfig.name(), alterableConfig.value()), AlterConfigOp.OpType.forId(alterableConfig.configOperation()));
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static Map<String, Collection<AlterConfigOp>> getConfigsToAlterByTopic(AlterConfigsRequest alterConfigsRequest) {
        Map map = (Map) alterConfigsRequest.data().resources().stream().map((v0) -> {
            return v0.resourceName();
        }).collect(Collectors.groupingBy(UnaryOperator.identity(), Collectors.counting()));
        return (Map) alterConfigsRequest.data().resources().stream().filter(alterConfigsResource -> {
            return ((Long) map.get(alterConfigsResource.resourceName())).longValue() == 1;
        }).map(alterConfigsResource2 -> {
            return new AbstractMap.SimpleEntry(alterConfigsResource2.resourceName(), (Collection) alterConfigsResource2.configs().stream().map(alterableConfig -> {
                return new AlterConfigOp(new ConfigEntry(alterableConfig.name(), alterableConfig.value()), AlterConfigOp.OpType.SET);
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private ArgumentCaptor<AbstractResponse> getAbstractResponseArgumentCaptor() {
        ArgumentCaptor<AbstractResponse> forClass = ArgumentCaptor.forClass(AbstractResponse.class);
        ((RequestChannel) Mockito.doNothing().when(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) forClass.capture(), (Option) ArgumentMatchers.any());
        return forClass;
    }

    private CreatePartitionsResponse getCreatePartitionsResponse(ArgumentCaptor<AbstractResponse> argumentCaptor) {
        ((RequestChannel) Mockito.verify(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) ArgumentMatchers.any(), (Option) ArgumentMatchers.any());
        CreatePartitionsResponse createPartitionsResponse = (AbstractResponse) argumentCaptor.getValue();
        Assertions.assertInstanceOf(CreatePartitionsResponse.class, createPartitionsResponse);
        return createPartitionsResponse;
    }

    private IncrementalAlterConfigsResponse getIncrementalAlterConfigsResponse(ArgumentCaptor<AbstractResponse> argumentCaptor) {
        ((RequestChannel) Mockito.verify(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) ArgumentMatchers.any(), (Option) ArgumentMatchers.any());
        IncrementalAlterConfigsResponse incrementalAlterConfigsResponse = (AbstractResponse) argumentCaptor.getValue();
        Assertions.assertInstanceOf(IncrementalAlterConfigsResponse.class, incrementalAlterConfigsResponse);
        return incrementalAlterConfigsResponse;
    }

    private AlterConfigsResponse getLegacyAlterConfigsResponse(ArgumentCaptor<AbstractResponse> argumentCaptor) {
        ((RequestChannel) Mockito.verify(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) ArgumentMatchers.any(), (Option) ArgumentMatchers.any());
        AlterConfigsResponse alterConfigsResponse = (AbstractResponse) argumentCaptor.getValue();
        Assertions.assertInstanceOf(AlterConfigsResponse.class, alterConfigsResponse);
        return alterConfigsResponse;
    }

    private CreateTopicsResponse getCreateTopicsResponse(ArgumentCaptor<AbstractResponse> argumentCaptor) {
        ((RequestChannel) Mockito.verify(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) ArgumentMatchers.any(), (Option) ArgumentMatchers.any());
        CreateTopicsResponse createTopicsResponse = (AbstractResponse) argumentCaptor.getValue();
        Assertions.assertInstanceOf(CreateTopicsResponse.class, createTopicsResponse);
        return createTopicsResponse;
    }

    private DeleteTopicsResponse getDeleteTopicsResponse(ArgumentCaptor<AbstractResponse> argumentCaptor) {
        ((RequestChannel) Mockito.verify(this.requestChannel)).sendResponse((RequestChannel.Request) ArgumentMatchers.any(), (AbstractResponse) ArgumentMatchers.any(), (Option) ArgumentMatchers.any());
        DeleteTopicsResponse deleteTopicsResponse = (AbstractResponse) argumentCaptor.getValue();
        Assertions.assertInstanceOf(DeleteTopicsResponse.class, deleteTopicsResponse);
        return deleteTopicsResponse;
    }

    private void verifyExpectedThrottling(int i) {
        ((ClientRequestQuotaManager) Mockito.verify(this.clientRequestQuotaManager)).maybeRecordAndGetThrottleTimeMs((RequestChannel.Request) ArgumentMatchers.any(), ArgumentMatchers.eq(this.time.milliseconds()));
        ((ClientRequestQuotaManager) Mockito.verify(this.clientRequestQuotaManager)).throttle((RequestChannel.Request) ArgumentMatchers.any(), (ThrottleCallback) ArgumentMatchers.any(), ArgumentMatchers.eq(i));
    }

    private static CreatePartitionsRequest getCreatePartitionsRequest(int i, int i2, boolean z, boolean z2) {
        CreatePartitionsRequestData.CreatePartitionsTopicCollection createPartitionsTopicCollection = new CreatePartitionsRequestData.CreatePartitionsTopicCollection();
        for (int i3 = 0; i3 < i; i3++) {
            CreatePartitionsRequestData.CreatePartitionsTopic createPartitionsTopic = new CreatePartitionsRequestData.CreatePartitionsTopic();
            if (!z || i <= 1) {
                createPartitionsTopic.setName("test_topic_" + i3);
            } else {
                createPartitionsTopic.setName("test_topic_duplicated");
            }
            createPartitionsTopic.setCount(i2);
            if (z2) {
                for (int i4 = 0; i4 < i2; i4++) {
                    createPartitionsTopic.assignments().add(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(0, 1)));
                }
            }
            createPartitionsTopicCollection.add(createPartitionsTopic);
        }
        return new CreatePartitionsRequest.Builder(new CreatePartitionsRequestData().setTopics(createPartitionsTopicCollection)).build(ApiKeys.CREATE_PARTITIONS.latestVersion());
    }

    private static IncrementalAlterConfigsRequest getIncrementalAlterTopicConfigsRequest(int i, boolean z, boolean z2) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, (!z || i <= 1) ? "test_topic_" + i2 : "test_topic_duplicated");
            arrayList.add(configResource);
            hashMap.put(configResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry("max.message.bytes", String.valueOf(1000002)), AlterConfigOp.OpType.SET)));
        }
        return new IncrementalAlterConfigsRequest.Builder(arrayList, hashMap, z2).build(ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion());
    }

    private static AlterConfigsRequest getLegacyAlterTopicConfigsRequest(int i, boolean z, boolean z2) {
        AlterConfigsRequestData.AlterConfigsResourceCollection alterConfigsResourceCollection = new AlterConfigsRequestData.AlterConfigsResourceCollection();
        for (int i2 = 0; i2 < i; i2++) {
            AlterConfigsRequestData.AlterableConfigCollection alterableConfigCollection = new AlterConfigsRequestData.AlterableConfigCollection();
            alterableConfigCollection.add(new AlterConfigsRequestData.AlterableConfig().setName("max.message.bytes").setValue(String.valueOf(1000002)));
            alterConfigsResourceCollection.add(new AlterConfigsRequestData.AlterConfigsResource().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName((!z || i <= 1) ? "test_topic_" + i2 : "test_topic_duplicated").setConfigs(alterableConfigCollection));
        }
        return new AlterConfigsRequest(new AlterConfigsRequestData().setResources(alterConfigsResourceCollection).setValidateOnly(z2), ApiKeys.ALTER_CONFIGS.latestVersion());
    }

    private static CreateTopicsRequest getCreateTopicsRequest(String str, int i) {
        return getCreateTopicsRequest(str, i, Collections.emptyMap());
    }

    private static CreateTopicsRequest getCreateTopicsRequest(String str, int i, Map<String, String> map) {
        return getCreateTopicsRequest(str, i, (short) -1, map);
    }

    private static CreateTopicsRequest getCreateTopicsRequest(String str, int i, short s, Map<String, String> map) {
        CreateTopicsRequestData.CreatableTopicCollection creatableTopicCollection = new CreateTopicsRequestData.CreatableTopicCollection();
        CreateTopicsRequestData.CreatableTopic replicationFactor = new CreateTopicsRequestData.CreatableTopic().setName(str).setNumPartitions(i).setReplicationFactor(s);
        CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigCollection = new CreateTopicsRequestData.CreatableTopicConfigCollection();
        map.forEach((str2, str3) -> {
            creatableTopicConfigCollection.add(new CreateTopicsRequestData.CreatableTopicConfig().setName(str2).setValue(str3));
        });
        replicationFactor.setConfigs(creatableTopicConfigCollection);
        creatableTopicCollection.add(replicationFactor);
        return new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(creatableTopicCollection)).build(ApiKeys.CREATE_TOPICS.latestVersion());
    }

    private static CreateTopicsRequest getCreateTopicsRequest(String str, int i, String str2, int i2) {
        CreateTopicsRequestData.CreatableTopicCollection creatableTopicCollection = new CreateTopicsRequestData.CreatableTopicCollection();
        creatableTopicCollection.add(new CreateTopicsRequestData.CreatableTopic().setName(str).setNumPartitions(i).setReplicationFactor((short) -1));
        creatableTopicCollection.add(new CreateTopicsRequestData.CreatableTopic().setName(str2).setNumPartitions(i2).setReplicationFactor((short) -1));
        return new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(creatableTopicCollection)).build(ApiKeys.CREATE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(String str) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopicNames(Collections.singletonList(str))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(Uuid uuid) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopics(Collections.singletonList(new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid)))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(String str, String str2) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopicNames(Arrays.asList(str, str2))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(Uuid uuid, Uuid uuid2) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopics(Arrays.asList(new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid), new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid2)))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(Uuid uuid, String str) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopics(Arrays.asList(new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid), new DeleteTopicsRequestData.DeleteTopicState().setTopicId(Uuid.ZERO_UUID).setName(str)))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private static DeleteTopicsRequest getDeleteTopicsRequest(Uuid uuid, String str, Uuid uuid2, String str2, Uuid uuid3, String str3, Uuid uuid4, String str4) {
        return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData().setTopics(Arrays.asList(new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid).setName(str), new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid2).setName(str2), new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid3).setName(str3), new DeleteTopicsRequestData.DeleteTopicState().setTopicId(uuid4).setName(str4)))).build(ApiKeys.DELETE_TOPICS.latestVersion());
    }

    private RequestChannel.Request getRequest(String str, int i, CreateTopicsRequest createTopicsRequest) {
        return getRequest(str, i, createTopicsRequest, new KafkaPrincipal("User", "Alice"));
    }

    private RequestChannel.Request getRequest(String str, int i, CreateTopicsRequest createTopicsRequest, KafkaPrincipal kafkaPrincipal) {
        return buildRequest(createTopicsRequest, new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion(), str, i), kafkaPrincipal);
    }

    private RequestChannel.Request getRequest(String str, int i, DeleteTopicsRequest deleteTopicsRequest) {
        return buildRequest(deleteTopicsRequest, new RequestHeader(ApiKeys.DELETE_TOPICS, ApiKeys.DELETE_TOPICS.latestVersion(), str, i), new KafkaPrincipal("User", "Alice"));
    }

    private RequestChannel.Request getRequest(String str, int i, CreatePartitionsRequest createPartitionsRequest) {
        return buildRequest(createPartitionsRequest, new RequestHeader(ApiKeys.CREATE_PARTITIONS, ApiKeys.CREATE_PARTITIONS.latestVersion(), str, i), new KafkaPrincipal("User", "Alice"));
    }

    private RequestChannel.Request getRequest(String str, int i, IncrementalAlterConfigsRequest incrementalAlterConfigsRequest) {
        return buildRequest(incrementalAlterConfigsRequest, new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion(), str, i), new KafkaPrincipal("User", "Alice"));
    }

    private RequestChannel.Request getRequest(String str, int i, AlterConfigsRequest alterConfigsRequest) {
        return buildRequest(alterConfigsRequest, new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion(), str, i), new KafkaPrincipal("User", "Alice"));
    }

    private RequestChannel.Request buildRequest(AbstractRequest abstractRequest, RequestHeader requestHeader, KafkaPrincipal kafkaPrincipal) {
        ByteBuffer serializeWithHeader = abstractRequest.serializeWithHeader(requestHeader);
        try {
            return new RequestChannel.Request(1, new RequestContext(RequestHeader.parse(serializeWithHeader), "1", InetAddress.getLocalHost(), Optional.empty(), kafkaPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.SSL, ClientInformation.EMPTY, (PathAwareSniHostName) null, false, Optional.of(this.kafkaPrincipalSerde)), 0L, MemoryPool.NONE, serializeWithHeader, (RequestChannel.Metrics) Mockito.mock(RequestChannel.Metrics.class), Option.empty(), Context.current(), NoOpEventEmitter.INSTANCE, NoOpAuditLogProvider.INSTANCE, RequestLogFilter.MATCH_NONE, 0L);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    private static Option<Authorizer> noAuthorizer() {
        return Option.empty();
    }

    private static Option<Authorizer> denyAllAuthorizer() {
        Authorizer authorizer = (Authorizer) Mockito.mock(Authorizer.class);
        Mockito.when(authorizer.authorize((AuthorizableRequestContext) ArgumentMatchers.any(), (List) ArgumentMatchers.any())).thenReturn(Collections.singletonList(AuthorizationResult.DENIED));
        return Option.apply(authorizer);
    }

    private static Option<Authorizer> denyDescribeConfigsAuthorizer() {
        Authorizer authorizer = (Authorizer) Mockito.mock(Authorizer.class);
        Mockito.when(authorizer.authorize((AuthorizableRequestContext) ArgumentMatchers.any(), (List) ArgumentMatchers.argThat(list -> {
            return list != null && list.stream().anyMatch(action -> {
                return action.operation() == AclOperation.DESCRIBE_CONFIGS && action.resourcePattern().resourceType() == ResourceType.TOPIC;
            });
        }))).thenReturn(Collections.singletonList(AuthorizationResult.DENIED));
        Mockito.when(authorizer.authorize((AuthorizableRequestContext) ArgumentMatchers.any(), (List) ArgumentMatchers.argThat(list2 -> {
            return list2 == null || list2.stream().noneMatch(action -> {
                return action.operation() == AclOperation.DESCRIBE_CONFIGS && action.resourcePattern().resourceType() == ResourceType.TOPIC;
            });
        }))).thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED));
        return Option.apply(authorizer);
    }

    private static Option<Authorizer> allowAllAuthorizer() {
        Authorizer authorizer = (Authorizer) Mockito.mock(Authorizer.class);
        Mockito.when(authorizer.authorize((AuthorizableRequestContext) ArgumentMatchers.any(), (List) ArgumentMatchers.any())).thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED));
        return Option.apply(authorizer);
    }

    private static ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> getCreatePartitionsCaptor(K2ControllerHandler k2ControllerHandler, Map<String, List<Integer>> map) {
        SemiCompletionStage semiCompletionStage = (SemiCompletionStage) Mockito.mock(SemiCompletionStage.class);
        Mockito.when(k2ControllerHandler.createAdditionalPartitions((Map) ArgumentMatchers.eq(map), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean())).thenReturn(semiCompletionStage);
        ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> forClass = ArgumentCaptor.forClass(BiConsumer.class);
        ((SemiCompletionStage) Mockito.doNothing().when(semiCompletionStage)).whenCompleteAsyncRaw((BiConsumer) forClass.capture(), (Executor) ArgumentMatchers.any());
        return forClass;
    }

    private static ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> getAlterTopicConfigsCaptor(K2ControllerHandler k2ControllerHandler, Map<String, Collection<AlterConfigOp>> map, boolean z) {
        SemiCompletionStage semiCompletionStage = (SemiCompletionStage) Mockito.mock(SemiCompletionStage.class);
        Mockito.when(k2ControllerHandler.alterTopicConfigs((Map) ArgumentMatchers.eq(map), ArgumentMatchers.eq(z), ArgumentMatchers.anyBoolean())).thenReturn(semiCompletionStage);
        ArgumentCaptor<BiConsumer<Map<String, ApiError>, Throwable>> forClass = ArgumentCaptor.forClass(BiConsumer.class);
        ((SemiCompletionStage) Mockito.doNothing().when(semiCompletionStage)).whenCompleteAsyncRaw((BiConsumer) forClass.capture(), (Executor) ArgumentMatchers.any());
        return forClass;
    }

    private static ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> getCreateTopicsCaptor(K2ControllerHandler k2ControllerHandler, CreateTopicsRequestData.CreatableTopicCollection creatableTopicCollection) {
        return getCreateTopicsCaptor(k2ControllerHandler, creatableTopicCollection, true);
    }

    private static ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> getCreateTopicsCaptor(K2ControllerHandler k2ControllerHandler, CreateTopicsRequestData.CreatableTopicCollection creatableTopicCollection, boolean z) {
        SemiCompletionStage semiCompletionStage = (SemiCompletionStage) Mockito.mock(SemiCompletionStage.class);
        if (z) {
            Mockito.when(k2ControllerHandler.createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.eq(creatableTopicCollection), (Set) ArgumentMatchers.eq(creatableTopicCollection.stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet())), ArgumentMatchers.anyBoolean())).thenReturn(semiCompletionStage);
        } else {
            Mockito.when(k2ControllerHandler.createTopics((CreateTopicsRequestData.CreatableTopicCollection) ArgumentMatchers.eq(creatableTopicCollection), (Set) ArgumentMatchers.eq(Collections.emptySet()), ArgumentMatchers.anyBoolean())).thenReturn(semiCompletionStage);
        }
        ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> forClass = ArgumentCaptor.forClass(BiConsumer.class);
        ((SemiCompletionStage) Mockito.doNothing().when(semiCompletionStage)).whenCompleteAsyncRaw((BiConsumer) forClass.capture(), (Executor) ArgumentMatchers.any());
        return forClass;
    }

    private static CreateTopicsResponseData.CreatableTopicResult getCreatableTopicResult(ArgumentCaptor<BiConsumer<CreateTopicsResponseData.CreatableTopicResultCollection, Throwable>> argumentCaptor, String str, int i) {
        BiConsumer biConsumer = (BiConsumer) argumentCaptor.getValue();
        CreateTopicsResponseData.CreatableTopicResult numPartitions = new CreateTopicsResponseData.CreatableTopicResult().setName(str).setNumPartitions(i);
        CreateTopicsResponseData.CreatableTopicResultCollection creatableTopicResultCollection = new CreateTopicsResponseData.CreatableTopicResultCollection();
        creatableTopicResultCollection.add(numPartitions);
        biConsumer.accept(creatableTopicResultCollection, null);
        return numPartitions;
    }

    private static ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> getTopicMetadataCaptor(K2ControllerHandler k2ControllerHandler, Set<String> set, Set<Uuid> set2) {
        SemiCompletionStage semiCompletionStage = (SemiCompletionStage) Mockito.mock(SemiCompletionStage.class);
        Mockito.when(k2ControllerHandler.getTopicMetadata((Set) ArgumentMatchers.eq(set), (Set) ArgumentMatchers.eq(set2))).thenReturn(semiCompletionStage);
        ArgumentCaptor<BiConsumer<Set<K2TopicMetadata>, Throwable>> forClass = ArgumentCaptor.forClass(BiConsumer.class);
        ((SemiCompletionStage) Mockito.doNothing().when(semiCompletionStage)).whenCompleteAsyncRaw((BiConsumer) forClass.capture(), (Executor) ArgumentMatchers.any());
        return forClass;
    }

    private static ArgumentCaptor<BiConsumer<Map<Uuid, ApiError>, Throwable>> getDeleteTopicsCaptor(K2ControllerHandler k2ControllerHandler, Map<Uuid, Integer> map) {
        SemiCompletionStage semiCompletionStage = (SemiCompletionStage) Mockito.mock(SemiCompletionStage.class);
        ArgumentCaptor<BiConsumer<Map<Uuid, ApiError>, Throwable>> forClass = ArgumentCaptor.forClass(BiConsumer.class);
        ((SemiCompletionStage) Mockito.doNothing().when(semiCompletionStage)).whenCompleteAsyncRaw((BiConsumer) forClass.capture(), (Executor) ArgumentMatchers.any());
        Mockito.when(k2ControllerHandler.deleteTopics((Map) ArgumentMatchers.eq(map))).thenReturn(semiCompletionStage);
        return forClass;
    }
}
