package org.apache.kafka.tools;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.admin.RackAwareTest;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import kafka.zk.ConfigEntityChangeNotificationZNode;
import kafka.zk.DeleteTopicsTopicZNode;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.TopicCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Buffer;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/tools/TopicCommandIntegrationTest.class */
public class TopicCommandIntegrationTest extends KafkaServerTestHarness implements Logging, RackAwareTest {
    private final short defaultReplicationFactor = 1;
    private final int defaultNumPartitions = 1;
    private TopicCommand.TopicService topicService;
    private Admin adminClient;
    private String bootstrapServer;
    private String testTopicName;
    private Buffer<KafkaBroker> scalaBrokers;
    private Seq<ControllerServer> scalaControllers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/tools/TopicCommandIntegrationTest$ReplicaDistributions.class */
    public static class ReplicaDistributions {
        private final Map<Integer, List<String>> partitionRacks;
        private final Map<Integer, Integer> brokerLeaderCount;
        private final Map<Integer, Integer> brokerReplicasCount;

        public ReplicaDistributions(Map<Integer, List<String>> map, Map<Integer, Integer> map2, Map<Integer, Integer> map3) {
            this.partitionRacks = map;
            this.brokerLeaderCount = map2;
            this.brokerReplicasCount = map3;
        }
    }

    public Seq<KafkaConfig> generateConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put(0, "rack1");
        hashMap.put(1, "rack2");
        hashMap.put(2, "rack2");
        hashMap.put(3, "rack1");
        hashMap.put(4, "rack3");
        hashMap.put(5, "rack3");
        List<Properties> createBrokerProperties = ToolsTestUtils.createBrokerProperties(6, zkConnectOrNull(), hashMap, 1, (short) 1);
        ArrayList arrayList = new ArrayList();
        for (Properties properties : createBrokerProperties) {
            properties.put("replica.fetch.max.bytes", "1");
            arrayList.add(KafkaConfig.fromProps(properties));
        }
        return JavaConverters.asScalaBuffer(arrayList).toSeq();
    }

    private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(String... strArr) {
        return new TopicCommand.TopicCommandOptions((String[]) Stream.concat(Arrays.stream(strArr), Stream.of((Object[]) new String[]{"--bootstrap-server", this.bootstrapServer})).toArray(i -> {
            return new String[i];
        }));
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        Properties properties = new Properties();
        this.bootstrapServer = bootstrapServers(listenerName());
        properties.put("bootstrap.servers", this.bootstrapServer);
        this.adminClient = Admin.create(properties);
        this.topicService = new TopicCommand.TopicService(properties, Optional.of(this.bootstrapServer));
        this.testTopicName = String.format("%s-%s", ((Method) testInfo.getTestMethod().get()).getName(), TestUtils.randomString(10));
        this.scalaBrokers = brokers();
        this.scalaControllers = controllerServers();
    }

    @AfterEach
    public void close() throws Exception {
        if (this.topicService != null) {
            this.topicService.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreate(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 1, Map$.MODULE$.empty(), new Properties());
        Assertions.assertTrue(((Set) this.adminClient.listTopics().names().get()).contains(this.testTopicName), "Admin client didn't see the created topic. It saw: " + this.adminClient.listTopics().names().get());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithDefaults(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        List partitions = ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions();
        Assertions.assertEquals(1, partitions.size(), "Unequal partition size: " + partitions.size());
        Assertions.assertEquals((short) 1, (short) ((TopicPartitionInfo) partitions.get(0)).replicas().size(), "Unequal replication factor: " + ((TopicPartitionInfo) partitions.get(0)).replicas().size());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithDefaultReplication(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 1, Map$.MODULE$.empty(), new Properties());
        List partitions = ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions();
        Assertions.assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size());
        Assertions.assertEquals((short) 1, (short) ((TopicPartitionInfo) partitions.get(0)).replicas().size(), "Unequal replication factor: " + ((TopicPartitionInfo) partitions.get(0)).replicas().size());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithDefaultPartitions(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 2, Map$.MODULE$.empty(), new Properties());
        List partitions = ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions();
        Assertions.assertEquals(1, partitions.size(), "Unequal partition size: " + partitions.size());
        Assertions.assertEquals(2, (short) ((TopicPartitionInfo) partitions.get(0)).replicas().size(), "Partitions not replicated: " + ((TopicPartitionInfo) partitions.get(0)).replicas().size());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithConfigs(String str) throws Exception {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        Properties properties = new Properties();
        properties.put("delete.retention.ms", "1000");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), properties);
        Config config = (Config) ((Map) this.adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
        Assertions.assertEquals(1000, Integer.valueOf(config.get("delete.retention.ms").value()), "Config not set correctly: " + config.get("delete.retention.ms").value());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWhenAlreadyExists(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", Integer.toString(1), "--replication-factor", "1", "--topic", this.testTopicName);
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        Assertions.assertThrows(TopicExistsException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected TopicExistsException to throw");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWhenAlreadyExistsWithIfNotExists(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", this.testTopicName, "--if-not-exists"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithReplicaAssignment(String str) throws Exception {
        scala.collection.mutable.HashMap hashMap = new scala.collection.mutable.HashMap();
        hashMap.put(0, ((Buffer) JavaConverters.asScalaBufferConverter(Arrays.asList(5, 4)).asScala()).toSeq());
        hashMap.put(1, ((Buffer) JavaConverters.asScalaBufferConverter(Arrays.asList(3, 2)).asScala()).toSeq());
        hashMap.put(2, ((Buffer) JavaConverters.asScalaBufferConverter(Arrays.asList(1, 0)).asScala()).toSeq());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, hashMap, new Properties());
        List<TopicPartitionInfo> partitions = ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions();
        Assertions.assertEquals(3, partitions.size(), "Unequal partition size: " + partitions.size());
        Assertions.assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0));
        Assertions.assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1));
        Assertions.assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2));
    }

    private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> list, int i) {
        return (List) list.get(i).replicas().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithInvalidReplicationFactor(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(32768), "--topic", this.testTopicName);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected IllegalArgumentException to throw");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithNegativeReplicationFactor(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", "-1", "--topic", this.testTopicName);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected IllegalArgumentException to throw");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithNegativePartitionCount(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", this.testTopicName);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected IllegalArgumentException to throw");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testInvalidTopicLevelConfig(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", this.testTopicName, "--config", "message.timestamp.type=boom");
        Assertions.assertThrows(ConfigException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected ConfigException to throw");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testListTopics(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        String captureListTopicStandardOut = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
        Assertions.assertTrue(captureListTopicStandardOut.contains(this.testTopicName), "Expected topic name to be present in output: " + captureListTopicStandardOut);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testListTopicsWithIncludeList(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "kafka.testTopic1", this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "kafka.testTopic2", this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "oooof.testTopic1", this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        String captureListTopicStandardOut = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--topic", "kafka.*"));
        Assertions.assertTrue(captureListTopicStandardOut.contains("kafka.testTopic1"), "Expected topic name kafka.testTopic1 to be present in output: " + captureListTopicStandardOut);
        Assertions.assertTrue(captureListTopicStandardOut.contains("kafka.testTopic2"), "Expected topic name kafka.testTopic2 to be present in output: " + captureListTopicStandardOut);
        Assertions.assertFalse(captureListTopicStandardOut.contains("oooof.testTopic1"), "Do not expect topic name oooof.testTopic1 to be present in output: " + captureListTopicStandardOut);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testListTopicsWithExcludeInternal(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "kafka.testTopic1", this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "__consumer_offsets", this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        String captureListTopicStandardOut = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal"));
        Assertions.assertTrue(captureListTopicStandardOut.contains("kafka.testTopic1"), "Expected topic name kafka.testTopic1 to be present in output: " + captureListTopicStandardOut);
        Assertions.assertFalse(captureListTopicStandardOut.contains("__consumer_offsets"), "Do not expect topic name __consumer_offsets to be present in output: " + captureListTopicStandardOut);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterPartitionCount(String str) throws ExecutionException, InterruptedException {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--partitions", "3"));
        kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
        kafka.utils.TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(brokers().forall(kafkaBroker -> {
                return Boolean.valueOf(kafkaBroker.metadataCache().getTopicPartitions(this.testTopicName).size() == 3);
            }));
        }, () -> {
            return "Timeout waiting for new assignment propagating to broker";
        }, 15000L, 100L);
        TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).topicNameValues().get(this.testTopicName)).get();
        Assertions.assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterAssignment(String str) throws ExecutionException, InterruptedException {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3"));
        kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
        kafka.utils.TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(brokers().forall(kafkaBroker -> {
                return Boolean.valueOf(kafkaBroker.metadataCache().getTopicPartitions(this.testTopicName).size() == 3);
            }));
        }, () -> {
            return "Timeout waiting for new assignment propagating to broker";
        }, 15000L, 100L);
        TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).topicNameValues().get(this.testTopicName)).get();
        Assertions.assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size());
        List<Integer> partitionReplicas = getPartitionReplicas(topicDescription.partitions(), 2);
        Assertions.assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterAssignmentWithMoreAssignmentThanPartitions(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        Assertions.assertThrows(ExecutionException.class, () -> {
            this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"));
        }, "Expected to fail with ExecutionException");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterAssignmentWithMorePartitionsThanAssignment(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        Assertions.assertThrows(ExecutionException.class, () -> {
            this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"));
        }, "Expected to fail with ExecutionException");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterWithInvalidPartitionCount(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        Assertions.assertThrows(ExecutionException.class, () -> {
            this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", "-1", "--topic", this.testTopicName));
        }, "Expected to fail with ExecutionException");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterWhenTopicDoesntExist(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--partitions", "1");
        TopicCommand.TopicService topicService = new TopicCommand.TopicService(this.adminClient);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            topicService.alterTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected to fail with IllegalArgumentException");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterWhenTopicDoesntExistWithIfExists(String str) throws ExecutionException, InterruptedException {
        this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", this.testTopicName, "--partitions", "1", "--if-exists"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateAlterTopicWithRackAware(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(0, "rack1");
        hashMap.put(1, "rack2");
        hashMap.put(2, "rack2");
        hashMap.put(3, "rack1");
        hashMap.put(4, "rack3");
        hashMap.put(5, "rack3");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 18, 3, Map$.MODULE$.empty(), new Properties());
        checkReplicaDistribution((Map) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().stream().collect(Collectors.toMap(topicPartitionInfo -> {
            return Integer.valueOf(topicPartitionInfo.partition());
        }, topicPartitionInfo2 -> {
            return (List) topicPartitionInfo2.replicas().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
        })), hashMap, Integer.valueOf(hashMap.size()), 18, 3, true, true, true);
        int i = 36;
        this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", Integer.toString(36), "--topic", this.testTopicName));
        kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
        kafka.utils.TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(brokers().forall(kafkaBroker -> {
                return Boolean.valueOf(kafkaBroker.metadataCache().getTopicPartitions(this.testTopicName).size() == i);
            }));
        }, () -> {
            return "Timeout waiting for new assignment propagating to broker";
        }, 15000L, 100L);
        checkReplicaDistribution((Map) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().stream().collect(Collectors.toMap(topicPartitionInfo3 -> {
            return Integer.valueOf(topicPartitionInfo3.partition());
        }, topicPartitionInfo4 -> {
            return (List) topicPartitionInfo4.replicas().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
        })), hashMap, Integer.valueOf(hashMap.size()), 36, 3, true, true, true);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testConfigPreservationAcrossPartitionAlteration(String str) throws Exception {
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), properties);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        Config config = (Config) ((Map) this.adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
        Assertions.assertNotNull(config.get("cleanup.policy"), "Properties after creation don't contain compact");
        Assertions.assertEquals("compact", config.get("cleanup.policy").value(), "Properties after creation have incorrect value");
        if (!isKRaftTest()) {
            zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
        }
        this.topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", Integer.toString(3), "--topic", this.testTopicName));
        kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
        Config config2 = (Config) ((Map) this.adminClient.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource);
        Assertions.assertNotNull(config2.get("cleanup.policy"), "Updated properties do not contain cleanup.policy");
        Assertions.assertEquals("compact", config2.get("cleanup.policy").value(), "Updated properties have incorrect value");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testTopicDeletion(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", this.testTopicName);
        if (!isKRaftTest()) {
            Assertions.assertFalse(zkClient().pathExists(DeleteTopicsTopicZNode.path(this.testTopicName)), "Delete path for topic shouldn't exist before deletion.");
        }
        this.topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap);
        kafka.utils.TestUtils.verifyTopicDeletion(zkClientOrNull(), this.testTopicName, 1, brokers());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testTopicWithCollidingCharDeletionAndCreateAgain(String str) throws Exception {
        String str2 = "test.a";
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "test.a", this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", "test.a");
        if (!isKRaftTest()) {
            Assertions.assertFalse(zkClient().pathExists(DeleteTopicsTopicZNode.path("test.a")), "Delete path for topic shouldn't exist before deletion.");
        }
        this.topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap);
        kafka.utils.TestUtils.verifyTopicDeletion(zkClientOrNull(), "test.a", 1, brokers());
        Assertions.assertDoesNotThrow(() -> {
            return kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, str2, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        }, "Should be able to create a topic with colliding chars after deletion.");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDeleteInternalTopic(String str) throws Exception {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "__consumer_offsets", this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", "__consumer_offsets");
        String path = DeleteTopicsTopicZNode.path("__consumer_offsets");
        if (!isKRaftTest()) {
            Assertions.assertFalse(zkClient().pathExists(path), "Delete path for topic shouldn't exist before deletion.");
        }
        this.topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap);
        kafka.utils.TestUtils.verifyTopicDeletion(zkClientOrNull(), "__consumer_offsets", 1, brokers());
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDeleteWhenTopicDoesntExist(String str) {
        TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", this.testTopicName);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap);
        }, "Expected an exception when trying to delete a topic that does not exist.");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDeleteWhenTopicDoesntExistWithIfExists(String str) throws ExecutionException, InterruptedException {
        this.topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete", "--topic", this.testTopicName, "--if-exists"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribe(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), new Properties());
        String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName)).split(System.lineSeparator());
        Assertions.assertEquals(3, split.length, "Expected 3 rows in output, got " + split.length);
        Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Row does not start with " + this.testTopicName + ". Row is: " + split[0]);
    }

    @ValueSource(strings = {"quorum=zk", "quorum=kraft"})
    @ParameterizedTest
    public void testDescribeWithDescribeTopicPartitionsApi(String str) throws ExecutionException, InterruptedException {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 20, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "test-2", this.scalaBrokers, this.scalaControllers, 41, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "test-3", this.scalaBrokers, this.scalaControllers, 5, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "test-4", this.scalaBrokers, this.scalaControllers, 5, 2, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "test-5", this.scalaBrokers, this.scalaControllers, 100, 2, Map$.MODULE$.empty(), new Properties());
        String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--partition-size-limit-per-response=20")).split("\n");
        Assertions.assertEquals(176, split.length, String.join("\n", split));
        Assertions.assertTrue(split[2].contains("\tElr"), split[2]);
        Assertions.assertTrue(split[2].contains("LastKnownElr"), split[2]);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeWhenTopicDoesntExist(String str) {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName));
        }, "Expected an exception when trying to describe a topic that does not exist.");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeWhenTopicDoesntExistWithIfExists(String str) throws ExecutionException, InterruptedException {
        this.topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName, "--if-exists"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeUnavailablePartitions(String str) throws ExecutionException, InterruptedException {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 6, 1, Map$.MODULE$.empty(), new Properties());
        try {
            int partition = ((TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().stream().filter(topicPartitionInfo -> {
                return topicPartitionInfo.leader().id() == 0;
            }).findFirst().get()).partition();
            killBroker(0);
            kafka.utils.TestUtils.waitUntilTrue(() -> {
                boolean z = true;
                for (KafkaBroker kafkaBroker : JavaConverters.asJavaCollection(brokers())) {
                    if (kafkaBroker.config().brokerId() != 0) {
                        Optional findFirst = JavaConverters.asJavaCollection(kafkaBroker.dataPlaneRequestProcessor().metadataCache().getTopicMetadata(((scala.collection.mutable.Set) JavaConverters.asScalaSetConverter(Collections.singleton(this.testTopicName)).asScala()).toSet(), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false)).stream().filter(metadataResponseTopic -> {
                            return metadataResponseTopic.name().equals(this.testTopicName);
                        }).findFirst();
                        if (!findFirst.isPresent()) {
                            throw new AssertionError("Topic metadata is not found in metadata cache");
                        }
                        Optional findFirst2 = ((MetadataResponseData.MetadataResponseTopic) findFirst.get()).partitions().stream().filter(metadataResponsePartition -> {
                            return metadataResponsePartition.partitionIndex() == partition;
                        }).findFirst();
                        if (!findFirst2.isPresent()) {
                            throw new AssertionError("Partition metadata is not found in metadata cache");
                        }
                        z = z && ((MetadataResponseData.MetadataResponsePartition) findFirst2.get()).errorCode() == Errors.LEADER_NOT_AVAILABLE.code();
                    }
                }
                return Boolean.valueOf(z);
            }, () -> {
                return String.format("Partition metadata for %s is not propagated", this.testTopicName);
            }, 15000L, 100L);
            String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName, "--unavailable-partitions")).split(System.lineSeparator());
            Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Unexpected Topic " + split[0] + " received. Expect " + String.format("Topic: %s", this.testTopicName));
            Assertions.assertTrue(split[0].contains("Leader: none\tReplicas: 0\tIsr:"), "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeUnderReplicatedPartitions(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), new Properties());
        try {
            killBroker(0);
            if (isKRaftTest()) {
                ensureConsistentKRaftMetadata();
            } else {
                kafka.utils.TestUtils.waitForPartitionMetadata(aliveBrokers(), this.testTopicName, 0, 15000L);
            }
            String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")).split(System.lineSeparator());
            Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), String.format("Unexpected output: %s", split[0]));
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeUnderMinIsrPartitions(String str) {
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "6");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), properties);
        try {
            killBroker(0);
            if (isKRaftTest()) {
                ensureConsistentKRaftMetadata();
            } else {
                kafka.utils.TestUtils.waitUntilTrue(() -> {
                    return Boolean.valueOf(aliveBrokers().forall(kafkaBroker -> {
                        return Boolean.valueOf(((UpdateMetadataRequestData.UpdateMetadataPartitionState) kafkaBroker.metadataCache().getPartitionInfo(this.testTopicName, 0).get()).isr().size() == 5);
                    }));
                }, () -> {
                    return String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", this.testTopicName);
                }, 15000L, 100L);
            }
            String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")).split(System.lineSeparator());
            Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Unexpected topic: " + split[0]);
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String str) throws ExecutionException, InterruptedException {
        TopicPartition topicPartition = new TopicPartition(this.testTopicName, 0);
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.generateAndProduceMessages(brokers(), this.testTopicName, 10, -1);
        kafka.utils.TestUtils.generateAndProduceMessages(brokers(), this.testTopicName, 10, -1);
        List list = (List) JavaConverters.seqAsJavaList(brokers()).stream().map(kafkaBroker -> {
            return Integer.valueOf(kafkaBroker.config().brokerId());
        }).collect(Collectors.toList());
        ToolsTestUtils.setReplicationThrottleForPartitions(this.adminClient, list, Collections.singleton(topicPartition), 1);
        List list2 = (List) ((TopicPartitionInfo) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singleton(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0)).replicas().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList(list);
        arrayList.removeAll(list2);
        this.adminClient.alterPartitionReassignments(Collections.singletonMap(topicPartition, Optional.of(new NewPartitionReassignment(Collections.singletonList((Integer) arrayList.get(0)))))).all().get();
        kafka.utils.TestUtils.waitUntilTrue(() -> {
            try {
                return Boolean.valueOf(!((PartitionReassignment) ((Map) this.adminClient.listPartitionReassignments(Collections.singleton(topicPartition)).reassignments().get()).get(topicPartition)).addingReplicas().isEmpty());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, () -> {
            return "Reassignment didn't add the second node";
        }, 15000L, 100L);
        ensureConsistentKRaftMetadata();
        String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName)).split(System.lineSeparator());
        Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Unexpected describe output: " + split[0]);
        Assertions.assertEquals(2, split.length, "Unexpected describe output length: " + split.length);
        String captureDescribeTopicStandardOut = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions"));
        Assertions.assertEquals("", captureDescribeTopicStandardOut, String.format("--under-replicated-partitions shouldn't return anything: '%s'", captureDescribeTopicStandardOut));
        AtomicReference atomicReference = new AtomicReference();
        kafka.utils.TestUtils.waitUntilTrue(() -> {
            try {
                atomicReference.set((PartitionReassignment) ((Map) this.adminClient.listPartitionReassignments(Collections.singleton(topicPartition)).reassignments().get()).get(topicPartition));
                return Boolean.valueOf(atomicReference.get() != null);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Error while fetching reassignments", e);
            }
        }, () -> {
            return "Reassignments did not become non-null within the specified time";
        }, 20 * 100, 100L);
        Assertions.assertFalse(((PartitionReassignment) atomicReference.get()).addingReplicas().isEmpty());
        ToolsTestUtils.removeReplicationThrottleForPartitions(this.adminClient, list, Collections.singleton(topicPartition));
        kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeAtMinIsrPartitions(String str) {
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "4");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), properties);
        try {
            killBroker(0);
            killBroker(1);
            if (isKRaftTest()) {
                ensureConsistentKRaftMetadata();
            } else {
                kafka.utils.TestUtils.waitUntilTrue(() -> {
                    return Boolean.valueOf(aliveBrokers().forall(kafkaBroker -> {
                        return Boolean.valueOf(((UpdateMetadataRequestData.UpdateMetadataPartitionState) kafkaBroker.metadataCache().getPartitionInfo(this.testTopicName, 0).get()).isr().size() == 4);
                    }));
                }, () -> {
                    return String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", this.testTopicName);
                }, 15000L, 100L);
            }
            String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--at-min-isr-partitions")).split(System.lineSeparator());
            Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Unexpected output: " + split[0]);
            Assertions.assertEquals(1, split.length);
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeUnderMinIsrPartitionsMixed(String str) {
        String str2 = "under-min-isr-topic";
        String str3 = "offline-topic";
        scala.collection.mutable.HashMap hashMap = new scala.collection.mutable.HashMap();
        hashMap.put(0, ((Buffer) JavaConverters.asScalaBufferConverter(Arrays.asList(1, 2, 3)).asScala()).toSeq());
        scala.collection.mutable.HashMap hashMap2 = new scala.collection.mutable.HashMap();
        hashMap2.put(0, ((Buffer) JavaConverters.asScalaBufferConverter(Arrays.asList(0)).asScala()).toSeq());
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "6");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "under-min-isr-topic", this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), properties);
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "not-under-min-isr-topic", this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "offline-topic", this.scalaBrokers, this.scalaControllers, -1, -1, hashMap2, new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "fully-replicated-topic", this.scalaBrokers, this.scalaControllers, -1, -1, hashMap, new Properties());
        try {
            killBroker(0);
            if (isKRaftTest()) {
                ensureConsistentKRaftMetadata();
            } else {
                kafka.utils.TestUtils.waitUntilTrue(() -> {
                    return Boolean.valueOf(aliveBrokers().forall(kafkaBroker -> {
                        return Boolean.valueOf(((UpdateMetadataRequestData.UpdateMetadataPartitionState) kafkaBroker.metadataCache().getPartitionInfo(str2, 0).get()).isr().size() < 6 && ((UpdateMetadataRequestData.UpdateMetadataPartitionState) kafkaBroker.metadataCache().getPartitionInfo(str3, 0).get()).leader() == -1);
                    }));
                }, () -> {
                    return "Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic";
                }, 15000L, 100L);
            }
            kafka.utils.TestUtils.waitForAllReassignmentsToComplete(this.adminClient, 100L);
            String[] split = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")).split(System.lineSeparator());
            Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", "under-min-isr-topic")), "Unexpected output: " + split[0]);
            Assertions.assertTrue(split[1].startsWith(String.format("\tTopic: %s", "offline-topic")), "Unexpected output: " + split[1]);
            Assertions.assertEquals(2, split.length);
            restartDeadBrokers(false);
        } catch (Throwable th) {
            restartDeadBrokers(false);
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeReportOverriddenConfigs(String str) {
        Properties properties = new Properties();
        properties.put("file.delete.delay.ms", "1000");
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 2, 2, Map$.MODULE$.empty(), properties);
        Assertions.assertTrue(captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe")).contains("file.delete.delay.ms=1000"), String.format("Describe output should have contained %s", "file.delete.delay.ms=1000"));
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeAndListTopicsWithoutInternalTopics(String str) {
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "__consumer_offsets", this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        String captureDescribeTopicStandardOut = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--describe", "--exclude-internal"));
        Assertions.assertTrue(captureDescribeTopicStandardOut.contains(this.testTopicName), String.format("Output should have contained %s", this.testTopicName));
        Assertions.assertFalse(captureDescribeTopicStandardOut.contains("__consumer_offsets"), "Output should not have contained __consumer_offsets");
        String captureListTopicStandardOut = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal"));
        Assertions.assertTrue(captureListTopicStandardOut.contains(this.testTopicName), String.format("Output should have contained %s", this.testTopicName));
        Assertions.assertFalse(captureListTopicStandardOut.contains("__consumer_offsets"), "Output should not have contained __consumer_offsets");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(String str) throws Exception {
        this.adminClient = (Admin) Mockito.spy(this.adminClient);
        this.topicService.close();
        ((Admin) Mockito.doReturn(AdminClientTestUtils.listPartitionReassignmentsResult(new ClusterAuthorizationException("Unauthorized"))).when(this.adminClient)).listPartitionReassignments(Collections.singleton(new TopicPartition(this.testTopicName, 0)));
        this.topicService = new TopicCommand.TopicService(this.adminClient);
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, this.testTopicName, this.scalaBrokers, this.scalaControllers, 1, 1, Map$.MODULE$.empty(), new Properties());
        String captureDescribeTopicStandardOut = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", this.testTopicName));
        String[] split = captureDescribeTopicStandardOut.split(System.lineSeparator());
        Assertions.assertEquals(2, split.length, "Unexpected output: " + captureDescribeTopicStandardOut);
        Assertions.assertTrue(split[0].startsWith(String.format("Topic: %s", this.testTopicName)), "Unexpected output: " + split[0]);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCreateWithTopicNameCollision(String str) {
        String str2 = "foo_bar";
        kafka.utils.TestUtils.createTopicWithAdmin(this.adminClient, "foo_bar", this.scalaBrokers, this.scalaControllers, 1, 6, Map$.MODULE$.empty(), new Properties());
        Assertions.assertThrows(TopicExistsException.class, () -> {
            this.topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", str2));
        });
    }

    private void checkReplicaDistribution(Map<Integer, List<Integer>> map, Map<Integer, String> map2, Integer num, Integer num2, Integer num3, Boolean bool, Boolean bool2, Boolean bool3) {
        map.entrySet().stream().forEach(entry -> {
            Assertions.assertEquals(new HashSet((Collection) entry.getValue()).size(), ((List) entry.getValue()).size(), "More than one replica is assigned to same broker for the same partition");
        });
        ReplicaDistributions replicaDistribution = getReplicaDistribution(map, map2);
        if (bool.booleanValue()) {
            Assertions.assertEquals(Collections.nCopies(num2.intValue(), num3), (List) replicaDistribution.partitionRacks.values().stream().map(list -> {
                return Integer.valueOf((int) list.stream().distinct().count());
            }).collect(Collectors.toList()), "More than one replica of the same partition is assigned to the same rack");
        }
        if (bool2.booleanValue()) {
            Map map3 = replicaDistribution.brokerLeaderCount;
            Assertions.assertEquals(Collections.nCopies(num.intValue(), Integer.valueOf(num2.intValue() / num.intValue())), new ArrayList(map3.values()), "Preferred leader count is not even for brokers");
        }
        if (bool3.booleanValue()) {
            Map map4 = replicaDistribution.brokerReplicasCount;
            Assertions.assertEquals(Collections.nCopies(num.intValue(), Integer.valueOf((num2.intValue() * num3.intValue()) / num.intValue())), new ArrayList(map4.values()), "Replica count is not even for broker");
        }
    }

    private String captureDescribeTopicStandardOut(TopicCommand.TopicCommandOptions topicCommandOptions) {
        return ToolsTestUtils.captureStandardOut(() -> {
            try {
                this.topicService.describeTopic(topicCommandOptions);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private String captureListTopicStandardOut(TopicCommand.TopicCommandOptions topicCommandOptions) {
        return ToolsTestUtils.captureStandardOut(() -> {
            try {
                this.topicService.listTopics(topicCommandOptions);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> map, Map<Integer, String> map2) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        map.entrySet().stream().forEach(entry -> {
            Integer num = (Integer) entry.getKey();
            List list = (List) entry.getValue();
            Integer num2 = (Integer) list.get(0);
            hashMap.put(num2, Integer.valueOf(((Integer) hashMap.getOrDefault(num2, 0)).intValue() + 1));
            list.stream().forEach(num3 -> {
                hashMap2.put(num3, Integer.valueOf(((Integer) hashMap2.getOrDefault(num3, 0)).intValue() + 1));
                if (map2.containsKey(num3)) {
                    hashMap3.put(num, (List) Stream.of((Object[]) new List[]{Collections.singletonList((String) map2.get(num3)), (List) hashMap3.getOrDefault(num, Collections.emptyList())}).flatMap((v0) -> {
                        return v0.stream();
                    }).collect(Collectors.toList()));
                } else {
                    System.err.printf("No mapping found for %s in `brokerRackMapping`%n", num3);
                }
            });
        });
        return new ReplicaDistributions(hashMap3, hashMap, hashMap2);
    }
}
