/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.metadata.LeaderAndIsr;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.AdminCommandFailedException;
import org.apache.kafka.tools.AdminOperationException;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.TopicCommand;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TopicCommandTest {
    private final short defaultReplicationFactor = 1;
    private final int defaultNumPartitions = 1;
    private static final int CLUSTER_WAIT_MS = 60000;
    private final String bootstrapServer = "localhost:9092";
    private final String topicName = "topicName";

    @Test
    public void testIsNotUnderReplicatedWhenAdding() {
        List<Integer> replicaIds = Arrays.asList(1, 2);
        ArrayList<Node> replicas = new ArrayList<Node>();
        for (int id : replicaIds) {
            replicas.add(new Node(id, "localhost", 9090 + id));
        }
        TopicCommand.PartitionDescription partitionDescription = new TopicCommand.PartitionDescription("test-topic", new TopicPartitionInfo(0, new Node(1, "localhost", 9091), replicas, Collections.singletonList(new Node(1, "localhost", 9091))), null, Boolean.valueOf(false), new PartitionReassignment(replicaIds, Arrays.asList(2), Collections.emptyList()));
        Assertions.assertFalse((boolean)partitionDescription.isUnderReplicated());
    }

    @Test
    public void testAlterWithUnspecifiedPartitionCount() {
        String[] options = new String[]{" --bootstrap-server", "localhost:9092", "--alter", "--topic", "topicName"};
        this.assertInitializeInvalidOptionsExitCode(1, options);
    }

    @Test
    public void testConfigOptWithBootstrapServers() {
        this.assertInitializeInvalidOptionsExitCode(1, new String[]{"--bootstrap-server", "localhost:9092", "--alter", "--topic", "topicName", "--partitions", "3", "--config", "cleanup.policy=compact"});
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--create", "--topic", "topicName", "--partitions", "3", "--replication-factor", "3", "--config", "cleanup.policy=compact"});
        Assertions.assertTrue((boolean)opts.hasCreateOption());
        Assertions.assertEquals((Object)"localhost:9092", opts.bootstrapServer().get());
        Assertions.assertEquals((Object)"cleanup.policy=compact", ((List)opts.topicConfig().get()).get(0));
    }

    @Test
    public void testCreateWithPartitionCountWithoutReplicationFactorShouldSucceed() {
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--create", "--partitions", "2", "--topic", "topicName"});
        Assertions.assertTrue((boolean)opts.hasCreateOption());
        Assertions.assertEquals((Object)"topicName", opts.topic().get());
        Assertions.assertEquals((int)2, (Integer)((Integer)opts.partitions().get()));
    }

    @Test
    public void testCreateWithReplicationFactorWithoutPartitionCountShouldSucceed() {
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--create", "--replication-factor", "3", "--topic", "topicName"});
        Assertions.assertTrue((boolean)opts.hasCreateOption());
        Assertions.assertEquals((Object)"topicName", opts.topic().get());
        Assertions.assertEquals((int)3, (Integer)((Integer)opts.replicationFactor().get()));
    }

    @Test
    public void testCreateWithAssignmentAndPartitionCount() {
        this.assertInitializeInvalidOptionsExitCode(1, new String[]{"--bootstrap-server", "localhost:9092", "--create", "--replica-assignment", "3:0,5:1", "--partitions", "2", "--topic", "topicName"});
    }

    @Test
    public void testCreateWithAssignmentAndReplicationFactor() {
        this.assertInitializeInvalidOptionsExitCode(1, new String[]{"--bootstrap-server", "localhost:9092", "--create", "--replica-assignment", "3:0,5:1", "--replication-factor", "2", "--topic", "topicName"});
    }

    @Test
    public void testCreateWithoutPartitionCountAndReplicationFactorShouldSucceed() {
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--create", "--topic", "topicName"});
        Assertions.assertTrue((boolean)opts.hasCreateOption());
        Assertions.assertEquals((Object)"topicName", opts.topic().get());
        Assertions.assertFalse((boolean)opts.partitions().isPresent());
    }

    @Test
    public void testDescribeShouldSucceed() {
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--topic", "topicName"});
        Assertions.assertTrue((boolean)opts.hasDescribeOption());
        Assertions.assertEquals((Object)"topicName", opts.topic().get());
    }

    @Test
    public void testDescribeWithDescribeTopicsApiShouldSucceed() {
        TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--topic", "topicName"});
        Assertions.assertTrue((boolean)opts.hasDescribeOption());
        Assertions.assertEquals((Object)"topicName", opts.topic().get());
    }

    @Test
    public void testParseAssignmentDuplicateEntries() {
        Assertions.assertThrows(AdminCommandFailedException.class, () -> TopicCommand.parseReplicaAssignment((String)"5:5"));
    }

    @Test
    public void testParseAssignmentPartitionsOfDifferentSize() {
        Assertions.assertThrows(AdminOperationException.class, () -> TopicCommand.parseReplicaAssignment((String)"5:4:3,2:1"));
    }

    @Test
    public void testParseAssignment() {
        Map actualAssignment = TopicCommand.parseReplicaAssignment((String)"5:4,3:2,1:0");
        HashMap<Integer, List<Integer>> expectedAssignment = new HashMap<Integer, List<Integer>>();
        expectedAssignment.put(0, Arrays.asList(5, 4));
        expectedAssignment.put(1, Arrays.asList(3, 2));
        expectedAssignment.put(2, Arrays.asList(1, 0));
        Assertions.assertEquals(expectedAssignment, (Object)actualAssignment);
    }

    @Test
    public void testCreateTopicDoesNotRetryThrottlingQuotaExceededException() {
        Admin adminClient = (Admin)Mockito.mock(Admin.class);
        TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
        CreateTopicsResult result = AdminClientTestUtils.createTopicsResult((String)"topicName", (Throwable)Errors.THROTTLING_QUOTA_EXCEEDED.exception());
        Mockito.when((Object)adminClient.createTopics((Collection)ArgumentMatchers.any(), (CreateTopicsOptions)ArgumentMatchers.any())).thenReturn((Object)result);
        Assertions.assertThrows(ThrottlingQuotaExceededException.class, () -> topicService.createTopic(new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--create", "--topic", "topicName"})));
        NewTopic expectedNewTopic = new NewTopic("topicName", Optional.empty(), Optional.empty()).configs(Collections.emptyMap());
        ((Admin)Mockito.verify((Object)adminClient, (VerificationMode)Mockito.times((int)1))).createTopics((Collection)ArgumentMatchers.eq(new HashSet<NewTopic>(Arrays.asList(expectedNewTopic))), (CreateTopicsOptions)ArgumentMatchers.argThat(exception -> !exception.shouldRetryOnQuotaViolation()));
    }

    @Test
    public void testDeleteTopicDoesNotRetryThrottlingQuotaExceededException() {
        Admin adminClient = (Admin)Mockito.mock(Admin.class);
        TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
        ListTopicsResult listResult = AdminClientTestUtils.listTopicsResult((String)"topicName");
        Mockito.when((Object)adminClient.listTopics((ListTopicsOptions)ArgumentMatchers.any())).thenReturn((Object)listResult);
        DeleteTopicsResult result = AdminClientTestUtils.deleteTopicsResult((String)"topicName", (Throwable)Errors.THROTTLING_QUOTA_EXCEEDED.exception());
        Mockito.when((Object)adminClient.deleteTopics(ArgumentMatchers.anyCollection(), (DeleteTopicsOptions)ArgumentMatchers.any())).thenReturn((Object)result);
        ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> topicService.deleteTopic(new TopicCommand.TopicCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", "--delete", "--topic", "topicName"})));
        Assertions.assertInstanceOf(ThrottlingQuotaExceededException.class, (Object)exception.getCause());
        ((Admin)Mockito.verify((Object)adminClient)).deleteTopics((Collection)ArgumentMatchers.argThat(topics -> topics.equals(Arrays.asList("topicName"))), (DeleteTopicsOptions)ArgumentMatchers.argThat(options -> !options.shouldRetryOnQuotaViolation()));
    }

    @Test
    public void testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException() {
        Admin adminClient = (Admin)Mockito.mock(Admin.class);
        TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
        ListTopicsResult listResult = AdminClientTestUtils.listTopicsResult((String)"topicName");
        Mockito.when((Object)adminClient.listTopics((ListTopicsOptions)ArgumentMatchers.any())).thenReturn((Object)listResult);
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0), Collections.emptyList(), Collections.emptyList());
        DescribeTopicsResult describeResult = AdminClientTestUtils.describeTopicsResult((String)"topicName", (TopicDescription)new TopicDescription("topicName", false, Collections.singletonList(topicPartitionInfo)));
        Mockito.when((Object)adminClient.describeTopics(ArgumentMatchers.anyCollection())).thenReturn((Object)describeResult);
        CreatePartitionsResult result = AdminClientTestUtils.createPartitionsResult((String)"topicName", (Throwable)Errors.THROTTLING_QUOTA_EXCEEDED.exception());
        Mockito.when((Object)adminClient.createPartitions((Map)ArgumentMatchers.any(), (CreatePartitionsOptions)ArgumentMatchers.any())).thenReturn((Object)result);
        Exception exception = (Exception)Assertions.assertThrows(ExecutionException.class, () -> topicService.alterTopic(new TopicCommand.TopicCommandOptions(new String[]{"--alter", "--topic", "topicName", "--partitions", "3", "--bootstrap-server", "localhost:9092"})));
        Assertions.assertInstanceOf(ThrottlingQuotaExceededException.class, (Object)exception.getCause());
        ((Admin)Mockito.verify((Object)adminClient, (VerificationMode)Mockito.times((int)1))).createPartitions((Map)ArgumentMatchers.argThat(newPartitions -> ((NewPartitions)newPartitions.get("topicName")).totalCount() == 3), (CreatePartitionsOptions)ArgumentMatchers.argThat(createPartitionOption -> !createPartitionOption.shouldRetryOnQuotaViolation()));
    }

    public void assertInitializeInvalidOptionsExitCode(int expected, String[] options) {
        Exit.setExitProcedure((exitCode, message) -> {
            Assertions.assertEquals((int)expected, (int)exitCode);
            throw new RuntimeException();
        });
        try {
            Assertions.assertThrows(RuntimeException.class, () -> new TopicCommand.TopicCommandOptions(options));
        }
        finally {
            Exit.resetExitProcedure();
        }
    }

    private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(ClusterInstance clusterInstance, String ... opts) {
        String bootstrapServer = clusterInstance.bootstrapServers();
        String[] finalOptions = (String[])Stream.concat(Arrays.stream(opts), Stream.of("--bootstrap-server", bootstrapServer)).toArray(String[]::new);
        return new TopicCommand.TopicCommandOptions(finalOptions);
    }

    static List<ClusterConfig> generate() {
        HashMap<String, String> serverProp = new HashMap<String, String>();
        serverProp.put("replica.fetch.max.bytes", "1");
        serverProp.put("log.initial.task.delay.ms", "100");
        serverProp.put("log.segment.delete.delay.ms", "1000");
        HashMap rackInfo = new HashMap();
        HashMap<String, String> infoPerBroker1 = new HashMap<String, String>();
        infoPerBroker1.put("broker.rack", "rack1");
        HashMap<String, String> infoPerBroker2 = new HashMap<String, String>();
        infoPerBroker2.put("broker.rack", "rack2");
        HashMap<String, String> infoPerBroker3 = new HashMap<String, String>();
        infoPerBroker3.put("broker.rack", "rack2");
        HashMap<String, String> infoPerBroker4 = new HashMap<String, String>();
        infoPerBroker4.put("broker.rack", "rack1");
        HashMap<String, String> infoPerBroker5 = new HashMap<String, String>();
        infoPerBroker5.put("broker.rack", "rack3");
        HashMap<String, String> infoPerBroker6 = new HashMap<String, String>();
        infoPerBroker6.put("broker.rack", "rack3");
        rackInfo.put(0, infoPerBroker1);
        rackInfo.put(1, infoPerBroker2);
        rackInfo.put(2, infoPerBroker3);
        rackInfo.put(3, infoPerBroker4);
        rackInfo.put(4, infoPerBroker5);
        rackInfo.put(5, infoPerBroker6);
        return Collections.singletonList(ClusterConfig.defaultBuilder().setBrokers(6).setServerProperties(serverProp).setPerServerProperties(rackInfo).setTypes(Stream.of(Type.KRAFT).collect(Collectors.toSet())).build());
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testCreate(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            Assertions.assertTrue((boolean)((Set)adminClient.listTopics().names().get()).contains(testTopicName), (String)("Admin client didn't see the created topic. It saw: " + String.valueOf(adminClient.listTopics().names().get())));
            adminClient.deleteTopics(Collections.singletonList(testTopicName));
            clusterInstance.waitForTopic(testTopicName, 0);
            Assertions.assertTrue((boolean)((Set)adminClient.listTopics().names().get()).isEmpty(), (String)("Admin client see the created topic. It saw: " + String.valueOf(adminClient.listTopics().names().get())));
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testCreateWithDefaults(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            Assertions.assertTrue((boolean)((Set)adminClient.listTopics().names().get()).contains(testTopicName), (String)("Admin client didn't see the created topic. It saw: " + String.valueOf(adminClient.listTopics().names().get())));
            List partitions = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions();
            Assertions.assertEquals((int)1, (int)partitions.size(), (String)("Unequal partition size: " + partitions.size()));
            Assertions.assertEquals((short)1, (short)((short)((TopicPartitionInfo)partitions.get(0)).replicas().size()), (String)("Unequal replication factor: " + ((TopicPartitionInfo)partitions.get(0)).replicas().size()));
            adminClient.deleteTopics(Collections.singletonList(testTopicName));
            clusterInstance.waitForTopic(testTopicName, 0);
            Assertions.assertTrue((boolean)((Set)adminClient.listTopics().names().get()).isEmpty(), (String)("Admin client see the created topic. It saw: " + String.valueOf(adminClient.listTopics().names().get())));
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testCreateWithDefaultReplication(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, 1)));
            clusterInstance.waitForTopic(testTopicName, 2);
            List partitions = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions();
            Assertions.assertEquals((int)2, (int)partitions.size(), (String)("Unequal partition size: " + partitions.size()));
            Assertions.assertEquals((short)1, (short)((short)((TopicPartitionInfo)partitions.get(0)).replicas().size()), (String)("Unequal replication factor: " + ((TopicPartitionInfo)partitions.get(0)).replicas().size()));
        }
    }

    @ClusterTest(brokers=3)
    public void testCreateWithDefaultPartitions(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 2)));
            clusterInstance.waitForTopic(testTopicName, 1);
            List partitions = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions();
            Assertions.assertEquals((int)1, (int)partitions.size(), (String)("Unequal partition size: " + partitions.size()));
            Assertions.assertEquals((int)2, (int)((short)((TopicPartitionInfo)partitions.get(0)).replicas().size()), (String)("Partitions not replicated: " + ((TopicPartitionInfo)partitions.get(0)).replicas().size()));
        }
    }

    @ClusterTest(brokers=3)
    public void testCreateWithConfigs(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("delete.retention.ms", "1000");
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, 2).configs(topicConfig)));
            clusterInstance.waitForTopic(testTopicName, 2);
            Config configs = (Config)((Map)adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
            Assertions.assertEquals((int)1000, (Integer)Integer.valueOf(configs.get("delete.retention.ms").value()), (String)("Config not set correctly: " + configs.get("delete.retention.ms").value()));
        }
    }

    @ClusterTest(brokers=3)
    public void testCreateWhenAlreadyExists(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions createOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", Integer.toString(1), "--replication-factor", "1", "--topic", testTopicName);
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            Assertions.assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), (String)"Expected TopicExistsException to throw");
        }
    }

    @ClusterTest(brokers=3)
    public void testCreateWhenAlreadyExistsWithIfNotExists(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            TopicCommand.TopicCommandOptions createOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--topic", testTopicName, "--if-not-exists");
            topicService.createTopic(createOpts);
        }
    }

    private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> partitions, int partitionNumber) {
        return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
    }

    @ClusterTemplate(value="generate")
    public void testCreateWithReplicaAssignment(ClusterInstance clusterInstance) throws Exception {
        HashMap<Integer, List<Integer>> replicaAssignmentMap = new HashMap<Integer, List<Integer>>();
        try (Admin adminClient = clusterInstance.admin();){
            String testTopicName = TestUtils.randomString((int)10);
            replicaAssignmentMap.put(0, Arrays.asList(5, 4));
            replicaAssignmentMap.put(1, Arrays.asList(3, 2));
            replicaAssignmentMap.put(2, Arrays.asList(1, 0));
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, replicaAssignmentMap)));
            clusterInstance.waitForTopic(testTopicName, 3);
            List partitions = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions();
            Assertions.assertEquals((int)3, (int)partitions.size(), (String)("Unequal partition size: " + partitions.size()));
            Assertions.assertEquals(Arrays.asList(5, 4), this.getPartitionReplicas(partitions, 0), (String)("Unexpected replica assignment: " + String.valueOf(this.getPartitionReplicas(partitions, 0))));
            Assertions.assertEquals(Arrays.asList(3, 2), this.getPartitionReplicas(partitions, 1), (String)("Unexpected replica assignment: " + String.valueOf(this.getPartitionReplicas(partitions, 1))));
            Assertions.assertEquals(Arrays.asList(1, 0), this.getPartitionReplicas(partitions, 2), (String)("Unexpected replica assignment: " + String.valueOf(this.getPartitionReplicas(partitions, 2))));
        }
    }

    @ClusterTest(brokers=3)
    public void testCreateWithInvalidReplicationFactor(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions opts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "2", "--replication-factor", Integer.toString(32768), "--topic", testTopicName);
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), (String)"Expected IllegalArgumentException to throw");
        }
    }

    @ClusterTest
    public void testCreateWithNegativeReplicationFactor(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions opts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName);
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), (String)"Expected IllegalArgumentException to throw");
        }
    }

    @ClusterTest
    public void testCreateWithNegativePartitionCount(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions opts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName);
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), (String)"Expected IllegalArgumentException to throw");
        }
    }

    @ClusterTest
    public void testInvalidTopicLevelConfig(ClusterInstance clusterInstance) {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
            TopicCommand.TopicCommandOptions createOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, "--config", "message.timestamp.type=boom");
            Assertions.assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), (String)"Expected ConfigException to throw");
        }
    }

    @ClusterTest
    public void testListTopics(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            String output = this.captureListTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list"));
            Assertions.assertTrue((boolean)output.contains(testTopicName), (String)("Expected topic name to be present in output: " + output));
        }
    }

    @ClusterTest(brokers=3)
    public void testListTopicsWithIncludeList(ClusterInstance clusterInstance) throws InterruptedException {
        try (Admin adminClient = clusterInstance.admin();){
            String topic1 = "kafka.testTopic1";
            String topic2 = "kafka.testTopic2";
            String topic3 = "oooof.testTopic1";
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic1, partition, replicationFactor)));
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic2, partition, replicationFactor)));
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic3, partition, replicationFactor)));
            clusterInstance.waitForTopic(topic1, partition);
            clusterInstance.waitForTopic(topic2, partition);
            clusterInstance.waitForTopic(topic3, partition);
            String output = this.captureListTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--topic", "kafka.*"));
            Assertions.assertTrue((boolean)output.contains(topic1), (String)("Expected topic name " + topic1 + " to be present in output: " + output));
            Assertions.assertTrue((boolean)output.contains(topic2), (String)("Expected topic name " + topic2 + " to be present in output: " + output));
            Assertions.assertFalse((boolean)output.contains(topic3), (String)("Do not expect topic name " + topic3 + " to be present in output: " + output));
        }
    }

    @ClusterTest(brokers=3)
    public void testListTopicsWithExcludeInternal(ClusterInstance clusterInstance) throws InterruptedException {
        try (Admin adminClient = clusterInstance.admin();){
            String topic1 = "kafka.testTopic1";
            String hiddenConsumerTopic = "__consumer_offsets";
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic1, partition, replicationFactor)));
            clusterInstance.waitForTopic(topic1, partition);
            String output = this.captureListTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--exclude-internal"));
            Assertions.assertTrue((boolean)output.contains(topic1), (String)("Expected topic name " + topic1 + " to be present in output: " + output));
            Assertions.assertFalse((boolean)output.contains(hiddenConsumerTopic), (String)("Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output));
        }
    }

    @ClusterTest(brokers=3)
    public void testAlterPartitionCount(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partition);
            topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "3"));
            TestUtils.waitForCondition(() -> ((Map)adminClient.listPartitionReassignments().reassignments().get()).isEmpty(), (long)60000L, (String)(testTopicName + String.format("reassignmet not finished after %s ms", 60000)));
            TestUtils.waitForCondition(() -> clusterInstance.brokers().values().stream().allMatch(b -> b.metadataCache().numPartitions(testTopicName).orElse(0) == 3), (long)15000L, (String)"Timeout waiting for new assignment propagating to broker");
            TopicDescription topicDescription = (TopicDescription)((KafkaFuture)adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName)).get();
            Assertions.assertEquals((int)3, (int)topicDescription.partitions().size(), (String)("Expected partition count to be 3. Got: " + topicDescription.partitions().size()));
        }
    }

    @ClusterTemplate(value="generate")
    public void testAlterAssignment(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partition);
            topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3"));
            TestUtils.waitForCondition(() -> ((Map)adminClient.listPartitionReassignments().reassignments().get()).isEmpty(), (long)60000L, (String)(testTopicName + String.format("reassignmet not finished after %s ms", 60000)));
            TestUtils.waitForCondition(() -> clusterInstance.brokers().values().stream().allMatch(b -> b.metadataCache().numPartitions(testTopicName).orElse(0) == 3), (long)15000L, (String)"Timeout waiting for new assignment propagating to broker");
            TopicDescription topicDescription = (TopicDescription)((KafkaFuture)adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName)).get();
            Assertions.assertEquals((int)3, (int)topicDescription.partitions().size(), (String)("Expected partition count to be 3. Got: " + topicDescription.partitions().size()));
            List<Integer> partitionReplicas = this.getPartitionReplicas(topicDescription.partitions(), 2);
            Assertions.assertEquals(Arrays.asList(4, 2), partitionReplicas, (String)("Expected to have replicas 4,2. Got: " + String.valueOf(partitionReplicas)));
        }
    }

    @ClusterTest(brokers=3)
    public void testAlterAssignmentWithMoreAssignmentThanPartitions(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partition);
            Assertions.assertThrows(ExecutionException.class, () -> topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3")), (String)"Expected to fail with ExecutionException");
        }
    }

    @ClusterTemplate(value="generate")
    public void testAlterAssignmentWithMorePartitionsThanAssignment(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partition);
            Assertions.assertThrows(ExecutionException.class, () -> topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")), (String)"Expected to fail with ExecutionException");
        }
    }

    @ClusterTest
    public void testAlterWithInvalidPartitionCount(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            Assertions.assertThrows(ExecutionException.class, () -> topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--partitions", "-1", "--topic", testTopicName)), (String)"Expected to fail with ExecutionException");
        }
    }

    @ClusterTest
    public void testAlterWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions alterOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1");
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), (String)"Expected to fail with IllegalArgumentException");
        }
    }

    @ClusterTest
    public void testAlterWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        Admin adminClient = clusterInstance.admin();
        TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
        topicService.alterTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists"));
        adminClient.close();
        topicService.close();
    }

    @ClusterTemplate(value="generate")
    public void testCreateAlterTopicWithRackAware(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            HashMap<Integer, String> rackInfo = new HashMap<Integer, String>();
            rackInfo.put(0, "rack1");
            rackInfo.put(1, "rack2");
            rackInfo.put(2, "rack2");
            rackInfo.put(3, "rack1");
            rackInfo.put(4, "rack3");
            rackInfo.put(5, "rack3");
            int numPartitions = 18;
            int replicationFactor = 3;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, numPartitions, (short)replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, numPartitions);
            Map<Integer, List<Integer>> assignment = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions().stream().collect(Collectors.toMap(info -> info.partition(), info -> info.replicas().stream().map(Node::id).collect(Collectors.toList())));
            this.checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), numPartitions, replicationFactor, true, true, true);
            int alteredNumPartitions = 36;
            TopicCommand.TopicCommandOptions alterOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--partitions", Integer.toString(alteredNumPartitions), "--topic", testTopicName);
            topicService.alterTopic(alterOpts);
            TestUtils.waitForCondition(() -> ((Map)adminClient.listPartitionReassignments().reassignments().get()).isEmpty(), (long)60000L, (String)(testTopicName + String.format("reassignmet not finished after %s ms", 60000)));
            TestUtils.waitForCondition(() -> clusterInstance.brokers().values().stream().allMatch(p -> p.metadataCache().numPartitions(testTopicName).orElse(0) == alteredNumPartitions), (long)15000L, (String)"Timeout waiting for new assignment propagating to broker");
            assignment = ((TopicDescription)((Map)adminClient.describeTopics(Collections.singletonList(testTopicName)).allTopicNames().get()).get(testTopicName)).partitions().stream().collect(Collectors.toMap(info -> info.partition(), info -> info.replicas().stream().map(Node::id).collect(Collectors.toList())));
            this.checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), alteredNumPartitions, replicationFactor, true, true, true);
        }
    }

    @ClusterTest(brokers=3)
    public void testConfigPreservationAcrossPartitionAlteration(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            String cleanUpPolicy = "compact";
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("cleanup.policy", cleanUpPolicy);
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1).configs(topicConfig)));
            clusterInstance.waitForTopic(testTopicName, 1);
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
            Config props = (Config)((Map)adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
            Assertions.assertNotNull((Object)props.get("cleanup.policy"), (String)("Properties after creation don't contain " + cleanUpPolicy));
            Assertions.assertEquals((Object)cleanUpPolicy, (Object)props.get("cleanup.policy").value(), (String)"Properties after creation have incorrect value");
            int numPartitionsModified = 3;
            TopicCommand.TopicCommandOptions alterOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--partitions", Integer.toString(numPartitionsModified), "--topic", testTopicName);
            topicService.alterTopic(alterOpts);
            TestUtils.waitForCondition(() -> clusterInstance.brokers().values().stream().allMatch(p -> p.metadataCache().numPartitions(testTopicName).orElse(0) == numPartitionsModified), (long)15000L, (String)"Timeout waiting for new assignment propagating to broker");
            Config newProps = (Config)((Map)adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
            Assertions.assertNotNull((Object)newProps.get("cleanup.policy"), (String)"Updated properties do not contain cleanup.policy");
            Assertions.assertEquals((Object)cleanUpPolicy, (Object)newProps.get("cleanup.policy").value(), (String)"Updated properties have incorrect value");
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testTopicDeletion(ClusterInstance clusterInstance) throws Exception {
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            String testTopicName = TestUtils.randomString((int)10);
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            TopicCommand.TopicCommandOptions deleteOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName);
            topicService.deleteTopic(deleteOpts);
            TestUtils.waitForCondition(() -> ((Collection)adminClient.listTopics().listings().get()).stream().noneMatch(topic -> topic.name().equals(testTopicName)), (long)60000L, (String)String.format("Delete topic fail in %s ms", 60000));
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testTopicWithCollidingCharDeletionAndCreateAgain(ClusterInstance clusterInstance) throws Exception {
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            String topicWithCollidingChar = "test.a";
            adminClient.createTopics(Collections.singletonList(new NewTopic(topicWithCollidingChar, 1, 1)));
            clusterInstance.waitForTopic(topicWithCollidingChar, 1);
            TopicCommand.TopicCommandOptions deleteOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", topicWithCollidingChar);
            topicService.deleteTopic(deleteOpts);
            TestUtils.waitForCondition(() -> ((Collection)adminClient.listTopics().listings().get()).stream().noneMatch(topic -> topic.name().equals(topicWithCollidingChar)), (long)60000L, (String)String.format("Delete topic fail in %s ms", 60000));
            clusterInstance.waitTopicDeletion(topicWithCollidingChar);
            adminClient.createTopics(Collections.singletonList(new NewTopic(topicWithCollidingChar, 1, 1)));
            clusterInstance.waitForTopic(topicWithCollidingChar, 1);
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testDeleteInternalTopic(ClusterInstance clusterInstance) throws Exception {
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            adminClient.createTopics(Collections.singletonList(new NewTopic("__consumer_offsets", 1, 1)));
            clusterInstance.waitForTopic("__consumer_offsets", 1);
            TopicCommand.TopicCommandOptions deleteOffsetTopicOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", "__consumer_offsets");
            topicService.deleteTopic(deleteOffsetTopicOpts);
            TestUtils.waitForCondition(() -> ((Collection)adminClient.listTopics().listings().get()).stream().noneMatch(topic -> topic.name().equals("__consumer_offsets")), (long)60000L, (String)String.format("Delete topic fail in %s ms", 60000));
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testDeleteWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            TopicCommand.TopicCommandOptions deleteOpts = this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName);
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts), (String)"Expected an exception when trying to delete a topic that does not exist.");
        }
    }

    @ClusterTest(brokers=3, serverProperties={@ClusterConfigProperty(key="log.initial.task.delay.ms", value="100"), @ClusterConfigProperty(key="log.segment.delete.delay.ms", value="1000")})
    public void testDeleteWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            topicService.deleteTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName, "--if-exists"));
        }
    }

    @ClusterTemplate(value="generate")
    public void testDescribe(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            int partition = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partition);
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertEquals((int)3, (int)rows.length, (String)("Expected 3 rows in output, got " + rows.length));
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Row does not start with " + testTopicName + ". Row is: " + rows[0]));
        }
    }

    @ClusterTemplate(value="generate")
    public void testDescribeWithDescribeTopicPartitionsApi(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
            topics.add(new NewTopic(testTopicName, 20, 2));
            topics.add(new NewTopic("test-2", 41, 2));
            topics.add(new NewTopic("test-3", 5, 2));
            topics.add(new NewTopic("test-4", 5, 2));
            topics.add(new NewTopic("test-5", 100, 2));
            adminClient.createTopics(topics);
            clusterInstance.waitForTopic(testTopicName, 20);
            clusterInstance.waitForTopic("test-2", 41);
            clusterInstance.waitForTopic("test-3", 5);
            clusterInstance.waitForTopic("test-4", 5);
            clusterInstance.waitForTopic("test-5", 100);
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--partition-size-limit-per-response=20", "--exclude-internal"));
            CharSequence[] rows = output.split("\n");
            Assertions.assertEquals((int)176, (int)rows.length, (String)String.join((CharSequence)"\n", rows));
            Assertions.assertTrue((boolean)((String)rows[2]).contains("\tElr"), (String)rows[2]);
            Assertions.assertTrue((boolean)((String)rows[2]).contains("LastKnownElr"), (String)rows[2]);
        }
    }

    @ClusterTest
    public void testDescribeWhenTopicDoesntExist(ClusterInstance clusterInstance) {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
            Assertions.assertThrows(IllegalArgumentException.class, () -> topicService.describeTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)), (String)"Expected an exception when trying to describe a topic that does not exist.");
        }
    }

    @ClusterTest
    public void testDescribeWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
            topicService.describeTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--if-exists"));
            topicService.close();
        }
    }

    @ClusterTest(brokers=3)
    public void testDescribeUnavailablePartitions(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            int partitions = 3;
            short replicationFactor = 1;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partitions);
            clusterInstance.shutdownBroker(0);
            Assertions.assertEquals((int)2, (int)clusterInstance.aliveBrokers().size());
            clusterInstance.waitForTopic(testTopicName, 3);
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--unavailable-partitions"));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Unexpected Topic " + rows[0] + " received. Expect " + String.format("Topic: %s", testTopicName)));
            Assertions.assertTrue((boolean)rows[0].contains("Leader: none\tReplicas: 0\tIsr:"), (String)"Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
        }
    }

    @ClusterTest(brokers=3)
    public void testDescribeUnderReplicatedPartitions(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            int partitions = 1;
            short replicationFactor = 3;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor)));
            clusterInstance.waitForTopic(testTopicName, partitions);
            clusterInstance.shutdownBroker(0);
            Assertions.assertEquals((int)clusterInstance.aliveBrokers().size(), (int)2);
            TestUtils.waitForCondition(() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> {
                Optional<LeaderAndIsr> partitionState = Optional.ofNullable((LeaderAndIsr)broker.metadataCache().getLeaderAndIsr(testTopicName, 0).orElseGet(null));
                return partitionState.map(s -> FetchRequest.isValidBrokerId((int)s.leader())).orElse(false);
            }), (long)60000L, (String)String.format("Meta data propogation fail in %s ms", 60000));
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-replicated-partitions"));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)String.format("Unexpected output: %s", rows[0]));
        }
    }

    @ClusterTest(brokers=3)
    public void testDescribeUnderMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("min.insync.replicas", "3");
            int partitions = 1;
            short replicationFactor = 3;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
            clusterInstance.waitForTopic(testTopicName, partitions);
            clusterInstance.shutdownBroker(0);
            Assertions.assertEquals((int)2, (int)clusterInstance.aliveBrokers().size());
            TestUtils.waitForCondition(() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> ((LeaderAndIsr)broker.metadataCache().getLeaderAndIsr(testTopicName, 0).get()).isr().size() == 2), (long)60000L, (String)String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName));
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-min-isr-partitions", "--exclude-internal"));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Unexpected topic: " + rows[0]));
        }
    }

    @ClusterTemplate(value="generate")
    public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();
             KafkaProducer<String, String> producer = this.createProducer(clusterInstance);){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            TopicPartition tp = new TopicPartition(testTopicName, 0);
            this.sendProducerRecords(testTopicName, producer, 10);
            this.sendProducerRecords(testTopicName, producer, 10);
            ArrayList<Integer> brokerIds = new ArrayList<Integer>(clusterInstance.brokerIds());
            ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1);
            TopicDescription testTopicDesc = (TopicDescription)((Map)adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get()).get(testTopicName);
            TopicPartitionInfo firstPartition = (TopicPartitionInfo)testTopicDesc.partitions().get(0);
            List replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
            ArrayList<Integer> replicasDiff = new ArrayList<Integer>(brokerIds);
            replicasDiff.removeAll(replicasOfFirstPartition);
            Integer targetReplica = (Integer)replicasDiff.get(0);
            adminClient.alterPartitionReassignments(Collections.singletonMap(tp, Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
            TestUtils.waitForCondition(() -> !((PartitionReassignment)((Map)adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()).get(tp)).addingReplicas().isEmpty(), (long)60000L, (String)"Reassignment didn't add the second node");
            String simpleDescribeOutput = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName));
            String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator());
            Assertions.assertTrue((boolean)simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Unexpected describe output: " + simpleDescribeOutputRows[0]));
            Assertions.assertEquals((int)2, (int)simpleDescribeOutputRows.length, (String)("Unexpected describe output length: " + simpleDescribeOutputRows.length));
            String underReplicatedOutput = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-replicated-partitions"));
            Assertions.assertEquals((Object)"", (Object)underReplicatedOutput, (String)String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput));
            int maxRetries = 20;
            long pause = 100L;
            long waitTimeMs = (long)maxRetries * pause;
            AtomicReference reassignmentsRef = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                PartitionReassignment tempReassignments = (PartitionReassignment)((Map)adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()).get(tp);
                reassignmentsRef.set(tempReassignments);
                return reassignmentsRef.get() != null;
            }, (long)waitTimeMs, (String)"Reassignments did not become non-null within the specified time");
            Assertions.assertFalse((boolean)((PartitionReassignment)reassignmentsRef.get()).addingReplicas().isEmpty());
            ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp));
            TestUtils.waitForCondition(() -> ((Map)adminClient.listPartitionReassignments().reassignments().get()).isEmpty(), (long)60000L, (String)String.format("reassignmet not finished after %s ms", 60000));
        }
    }

    @ClusterTemplate(value="generate")
    public void testDescribeAtMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("min.insync.replicas", "4");
            int partitions = 1;
            short replicationFactor = 6;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
            clusterInstance.waitForTopic(testTopicName, partitions);
            clusterInstance.shutdownBroker(0);
            clusterInstance.shutdownBroker(1);
            Assertions.assertEquals((int)4, (int)clusterInstance.aliveBrokers().size());
            TestUtils.waitForCondition(() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> ((LeaderAndIsr)broker.metadataCache().getLeaderAndIsr(testTopicName, 0).get()).isr().size() == 4), (long)60000L, (String)String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName));
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--at-min-isr-partitions", "--exclude-internal"));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Unexpected output: " + rows[0]));
            Assertions.assertEquals((int)1, (int)rows.length);
        }
    }

    @ClusterTemplate(value="generate")
    public void testDescribeUnderMinIsrPartitionsMixed(ClusterInstance clusterInstance) throws InterruptedException {
        try (Admin adminClient = clusterInstance.admin();){
            String underMinIsrTopic = "under-min-isr-topic";
            String notUnderMinIsrTopic = "not-under-min-isr-topic";
            String offlineTopic = "offline-topic";
            String fullyReplicatedTopic = "fully-replicated-topic";
            int partitions = 1;
            short replicationFactor = 6;
            ArrayList<NewTopic> newTopics = new ArrayList<NewTopic>();
            HashMap<Integer, List<Integer>> fullyReplicatedReplicaAssignmentMap = new HashMap<Integer, List<Integer>>();
            fullyReplicatedReplicaAssignmentMap.put(0, Arrays.asList(1, 2, 3));
            HashMap<Integer, List<Integer>> offlineReplicaAssignmentMap = new HashMap<Integer, List<Integer>>();
            offlineReplicaAssignmentMap.put(0, Arrays.asList(0));
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("min.insync.replicas", "6");
            newTopics.add(new NewTopic(underMinIsrTopic, partitions, replicationFactor).configs(topicConfig));
            newTopics.add(new NewTopic(notUnderMinIsrTopic, partitions, replicationFactor));
            newTopics.add(new NewTopic(offlineTopic, offlineReplicaAssignmentMap));
            newTopics.add(new NewTopic(fullyReplicatedTopic, fullyReplicatedReplicaAssignmentMap));
            adminClient.createTopics(newTopics);
            for (NewTopic topioc : newTopics) {
                clusterInstance.waitForTopic(topioc.name(), partitions);
            }
            clusterInstance.shutdownBroker(0);
            Assertions.assertEquals((int)5, (int)clusterInstance.aliveBrokers().size());
            TestUtils.waitForCondition(() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> ((LeaderAndIsr)broker.metadataCache().getLeaderAndIsr(underMinIsrTopic, 0).get()).isr().size() < 6 && ((LeaderAndIsr)broker.metadataCache().getLeaderAndIsr(offlineTopic, 0).get()).leader() == -1), (long)60000L, (String)"Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic");
            TestUtils.waitForCondition(() -> ((Map)adminClient.listPartitionReassignments().reassignments().get()).isEmpty(), (long)60000L, (String)String.format("reassignmet not finished after %s ms", 60000));
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-min-isr-partitions", "--exclude-internal"));
            String[] rows = output.split(System.lineSeparator());
            Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic)), (String)("Unexpected output: " + rows[0]));
            Assertions.assertTrue((boolean)rows[1].startsWith(String.format("\tTopic: %s", offlineTopic)), (String)("Unexpected output: " + rows[1]));
            Assertions.assertEquals((int)2, (int)rows.length);
        }
    }

    @ClusterTest(brokers=3)
    public void testDescribeReportOverriddenConfigs(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            String config = "file.delete.delay.ms=1000";
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("file.delete.delay.ms", "1000");
            int partitions = 2;
            short replicationFactor = 2;
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
            clusterInstance.waitForTopic(testTopicName, partitions);
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe"));
            Assertions.assertTrue((boolean)output.contains(config), (String)String.format("Describe output should have contained %s", config));
        }
    }

    @ClusterTest
    public void testDescribeAndListTopicsWithoutInternalTopics(ClusterInstance clusterInstance) throws InterruptedException {
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = clusterInstance.admin();){
            adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
            clusterInstance.waitForTopic(testTopicName, 1);
            String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--describe", "--exclude-internal"));
            Assertions.assertTrue((boolean)output.contains(testTopicName), (String)String.format("Output should have contained %s", testTopicName));
            Assertions.assertFalse((boolean)output.contains("__consumer_offsets"), (String)"Output should not have contained __consumer_offsets");
            output = this.captureListTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--exclude-internal"));
            Assertions.assertTrue((boolean)output.contains(testTopicName), (String)String.format("Output should have contained %s", testTopicName));
            Assertions.assertFalse((boolean)output.contains("__consumer_offsets"), (String)"Output should not have contained __consumer_offsets");
        }
    }

    @ClusterTest
    public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(ClusterInstance clusterInstance) throws Exception {
        String testTopicName = TestUtils.randomString((int)10);
        Admin adminClient = clusterInstance.admin();
        adminClient = (Admin)Mockito.spy((Object)adminClient);
        ListPartitionReassignmentsResult result = AdminClientTestUtils.listPartitionReassignmentsResult((Throwable)new ClusterAuthorizationException("Unauthorized"));
        ((Admin)Mockito.doReturn((Object)result).when((Object)adminClient)).listPartitionReassignments(Collections.singleton(new TopicPartition(testTopicName, 0)));
        adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, 1)));
        clusterInstance.waitForTopic(testTopicName, 1);
        String output = this.captureDescribeTopicStandardOut(clusterInstance, this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName));
        String[] rows = output.split(System.lineSeparator());
        Assertions.assertEquals((int)2, (int)rows.length, (String)("Unexpected output: " + output));
        Assertions.assertTrue((boolean)rows[0].startsWith(String.format("Topic: %s", testTopicName)), (String)("Unexpected output: " + rows[0]));
        adminClient.close();
    }

    @ClusterTest(brokers=3)
    public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception {
        try (Admin adminClient = clusterInstance.admin();
             TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
            String topic = "foo_bar";
            int partitions = 1;
            short replicationFactor = 3;
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
            clusterInstance.waitForTopic(topic, 1);
            Assertions.assertThrows(TopicExistsException.class, () -> topicService.createTopic(this.buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--topic", topic)));
        }
    }

    @ClusterTest
    public void testCreateWithInternalConfig(ClusterInstance cluster) throws InterruptedException, ExecutionException {
        String internalConfigTopicName = TestUtils.randomString((int)10);
        String testTopicName = TestUtils.randomString((int)10);
        try (Admin adminClient = cluster.admin();){
            CreateTopicsResult internalResult = adminClient.createTopics(List.of(new NewTopic(internalConfigTopicName, 1, 1).configs(Map.of("internal.segment.bytes", "1000"))));
            ConfigEntry internalConfigEntry = ((Config)internalResult.config(internalConfigTopicName).get()).get("internal.segment.bytes");
            Assertions.assertNotNull((Object)internalConfigEntry, (String)"Internal config entry should not be null");
            Assertions.assertEquals((Object)"1000", (Object)internalConfigEntry.value());
            CreateTopicsResult nonInternalResult = adminClient.createTopics(List.of(new NewTopic(testTopicName, 1, 1)));
            ConfigEntry nonInternalConfigEntry = ((Config)nonInternalResult.config(testTopicName).get()).get("internal.segment.bytes");
            Assertions.assertNull((Object)nonInternalConfigEntry, (String)"Non-internal config entry should be null");
        }
    }

    private void checkReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping, Integer numBrokers, Integer numPartitions, Integer replicationFactor, Boolean verifyRackAware, Boolean verifyLeaderDistribution, Boolean verifyReplicasDistribution) {
        List<Integer> expected;
        assignment.forEach((partition, assignedNodes) -> Assertions.assertEquals((int)new HashSet(assignedNodes).size(), (int)assignedNodes.size(), (String)"More than one replica is assigned to same broker for the same partition"));
        ReplicaDistributions distribution = TopicCommandTest.getReplicaDistribution(assignment, brokerRackMapping);
        if (verifyRackAware.booleanValue()) {
            Map<Integer, List<String>> partitionRackMap = distribution.partitionRacks;
            List partitionRackMapValueSize = partitionRackMap.values().stream().map(value -> (int)value.stream().distinct().count()).collect(Collectors.toList());
            expected = Collections.nCopies(numPartitions, replicationFactor);
            Assertions.assertEquals(expected, partitionRackMapValueSize, (String)"More than one replica of the same partition is assigned to the same rack");
        }
        if (verifyLeaderDistribution.booleanValue()) {
            Map<Integer, Integer> leaderCount = distribution.brokerLeaderCount;
            int leaderCountPerBroker = numPartitions / numBrokers;
            expected = Collections.nCopies(numBrokers, leaderCountPerBroker);
            Assertions.assertEquals(expected, new ArrayList<Integer>(leaderCount.values()), (String)"Preferred leader count is not even for brokers");
        }
        if (verifyReplicasDistribution.booleanValue()) {
            Map<Integer, Integer> replicasCount = distribution.brokerReplicasCount;
            int numReplicasPerBroker = numPartitions * replicationFactor / numBrokers;
            expected = Collections.nCopies(numBrokers, numReplicasPerBroker);
            Assertions.assertEquals(expected, new ArrayList<Integer>(replicasCount.values()), (String)"Replica count is not even for broker");
        }
    }

    private String captureDescribeTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) {
        Runnable runnable = () -> {
            try (Admin adminClient = clusterInstance.admin();
                 TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
                topicService.describeTopic(opts);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        return ToolsTestUtils.captureStandardOut(runnable);
    }

    private String captureListTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) {
        Runnable runnable = () -> {
            try (Admin adminClient = clusterInstance.admin();
                 TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);){
                topicService.listTopics(opts);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        return ToolsTestUtils.captureStandardOut(runnable);
    }

    private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment, Map<Integer, String> brokerRackMapping) {
        HashMap<Integer, Integer> leaderCount = new HashMap<Integer, Integer>();
        HashMap<Integer, Integer> partitionCount = new HashMap<Integer, Integer>();
        HashMap<Integer, List<String>> partitionRackMap = new HashMap<Integer, List<String>>();
        assignment.forEach((partitionId, replicaList) -> {
            Integer leader = (Integer)replicaList.get(0);
            leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1);
            replicaList.forEach(brokerId -> {
                partitionCount.put((Integer)brokerId, partitionCount.getOrDefault(brokerId, 0) + 1);
                if (brokerRackMapping.containsKey(brokerId)) {
                    String rack = (String)brokerRackMapping.get(brokerId);
                    List partitionRackValues = Stream.of(Collections.singletonList(rack), partitionRackMap.getOrDefault(partitionId, Collections.emptyList())).flatMap(Collection::stream).collect(Collectors.toList());
                    partitionRackMap.put((Integer)partitionId, partitionRackValues);
                } else {
                    System.err.printf("No mapping found for %s in `brokerRackMapping`%n", brokerId);
                }
            });
        });
        return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount);
    }

    private KafkaProducer<String, String> createProducer(ClusterInstance clusterInstance) {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", clusterInstance.bootstrapServers());
        producerProps.put("acks", "-1");
        return new KafkaProducer(producerProps, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    }

    private void sendProducerRecords(String testTopicName, KafkaProducer<String, String> producer, int numMessage) {
        IntStream.range(0, numMessage).forEach(i -> producer.send(new ProducerRecord(testTopicName, (Object)("test-" + i))));
        producer.flush();
    }

    private static class ReplicaDistributions {
        private final Map<Integer, List<String>> partitionRacks;
        private final Map<Integer, Integer> brokerLeaderCount;
        private final Map<Integer, Integer> brokerReplicasCount;

        public ReplicaDistributions(Map<Integer, List<String>> partitionRacks, Map<Integer, Integer> brokerLeaderCount, Map<Integer, Integer> brokerReplicasCount) {
            this.partitionRacks = partitionRacks;
            this.brokerLeaderCount = brokerLeaderCount;
            this.brokerReplicasCount = brokerReplicasCount;
        }
    }
}

