package io.confluent.kafka.test.cluster;

import io.confluent.kafka.test.cluster.EmbeddedKafka;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.api.Request;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:io/confluent/kafka/test/cluster/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    private final MockTime time = new MockTime(System.currentTimeMillis(), System.nanoTime());
    private EmbeddedZookeeper zookeeper;
    private EmbeddedKafka[] brokers;

    public void startZooKeeper() {
        logger.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        logger.debug("ZooKeeper instance is running at {}", zkConnect());
    }

    public void startBrokers(int i, Properties properties) throws Exception {
        logger.debug("Initiating embedded Kafka cluster startup with config {}", properties);
        this.brokers = new EmbeddedKafka[i];
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig$.MODULE$.ZkConnectProp(), zkConnect());
        properties2.put(KafkaConfig$.MODULE$.PortProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        properties2.putAll(properties);
        putIfAbsent(properties2, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), Integer.valueOf(DEFAULT_BROKER_PORT));
        putIfAbsent(properties2, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
        putIfAbsent(properties2, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
        for (int i2 = DEFAULT_BROKER_PORT; i2 < this.brokers.length; i2++) {
            properties2.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.valueOf(i2));
            logger.debug("Starting a Kafka instance on port {} ...", properties2.get(KafkaConfig$.MODULE$.PortProp()));
            this.brokers[i2] = new EmbeddedKafka.Builder(this.time).addConfigs(properties2).build();
            logger.debug("Kafka instance started: {}", this.brokers[i2]);
        }
    }

    private void putIfAbsent(Properties properties, String str, Object obj) {
        if (properties.containsKey(str)) {
            return;
        }
        properties.put(str, obj);
    }

    public void shutdown() {
        if (this.brokers != null) {
            EmbeddedKafka[] embeddedKafkaArr = this.brokers;
            int length = embeddedKafkaArr.length;
            for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
                embeddedKafkaArr[i].shutdown();
            }
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }

    public String zkConnect() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public String bootstrapServers(String str) {
        return (String) Arrays.asList(this.brokers).stream().map(embeddedKafka -> {
            return embeddedKafka.brokerConnect(str);
        }).collect(Collectors.joining(","));
    }

    public String bootstrapServers() {
        List<String> listeners = this.brokers[DEFAULT_BROKER_PORT].listeners();
        if (listeners.size() > 2) {
            throw new IllegalStateException("Listener name not specified for listeners " + listeners);
        }
        String str = listeners.get(DEFAULT_BROKER_PORT);
        if (listeners.size() > 1 && this.brokers[DEFAULT_BROKER_PORT].kafkaServer().config().interBrokerListenerName().value().equals(str)) {
            str = listeners.get(1);
        }
        return bootstrapServers(str);
    }

    public void createTopic(String str, int i, int i2) {
        this.brokers[DEFAULT_BROKER_PORT].createTopic(str, i, i2, new Properties());
        ArrayList arrayList = new ArrayList();
        for (int i3 = DEFAULT_BROKER_PORT; i3 < i; i3++) {
            arrayList.add(new TopicPartition(str, i3));
        }
        waitForTopicPartitions(brokers(), arrayList);
    }

    private static void waitForTopicPartitions(List<KafkaServer> list, List<TopicPartition> list2) {
        list2.forEach(topicPartition -> {
            waitUntilMetadataIsPropagated(list, topicPartition);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitUntilMetadataIsPropagated(List<KafkaServer> list, TopicPartition topicPartition) {
        try {
            String str = topicPartition.topic();
            int partition = topicPartition.partition();
            TestUtils.waitForCondition(() -> {
                return list.stream().map(kafkaServer -> {
                    return kafkaServer.apis().metadataCache();
                }).allMatch(metadataCache -> {
                    Option partitionInfo = metadataCache.getPartitionInfo(str, partition);
                    if (partitionInfo.isEmpty()) {
                        return false;
                    }
                    return Request.isValidBrokerId(((UpdateMetadataRequest.PartitionState) partitionInfo.get()).basePartitionState.leader);
                });
            }, "Metadata for topic=" + str + " partition=" + partition + " not propagated");
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    public List<KafkaServer> brokers() {
        ArrayList arrayList = new ArrayList();
        EmbeddedKafka[] embeddedKafkaArr = this.brokers;
        int length = embeddedKafkaArr.length;
        for (int i = DEFAULT_BROKER_PORT; i < length; i++) {
            arrayList.add(embeddedKafkaArr[i].kafkaServer());
        }
        return arrayList;
    }
}
