package io.confluent.support.metrics.common.kafka;

import java.io.File;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Option$;

/* loaded from: input_file:io/confluent/support/metrics/common/kafka/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final boolean ENABLE_CONTROLLED_SHUTDOWN = true;
    private static final boolean ENABLE_DELETE_TOPIC = false;
    private static final int BROKER_PORT = 0;
    private static final boolean ENABLE_PLAINTEXT = true;
    private static final boolean ENABLE_SASL_PLAINTEXT = false;
    private static final int SASL_PLAINTEXT_PORT = 0;
    private static final boolean ENABLE_SSL = false;
    private static final int SSL_PORT = 0;
    private static final boolean ENABLE_SASL_SSL = false;
    private static final int SASL_SSL_PORT = 0;
    private static final int LOG_DIR_COUNT = 1;
    private static final int NUM_PARTITIONS = 1;
    private static final short DEFAULT_REPLICATION_FACTOR = 1;
    private EmbeddedZookeeper zookeeper = null;
    private final Map<Integer, KafkaServer> brokersById = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final Option<SecurityProtocol> INTER_BROKER_SECURITY_PROTOCOL = Option.apply(SecurityProtocol.PLAINTEXT);
    private static final Option<File> TRUST_STORE_FILE = Option$.MODULE$.empty();
    private static final Option<Properties> SASL_PROPERTIES = Option$.MODULE$.empty();

    public void startCluster(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("number of brokers must be >= 1");
        }
        startZookeeperIfNeeded();
        for (int i2 = 0; i2 < i; i2++) {
            log.debug("Starting broker with id {} ...", Integer.valueOf(i2));
            startBroker(i2);
        }
    }

    private void startZookeeperIfNeeded() {
        if (this.zookeeper == null) {
            this.zookeeper = new EmbeddedZookeeper();
        }
    }

    private void startBroker(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("broker id must not be negative");
        }
        if (!this.brokersById.containsKey(Integer.valueOf(i))) {
            this.brokersById.put(Integer.valueOf(i), TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zookeeperConnectString(), true, false, 0, INTER_BROKER_SECURITY_PROTOCOL, TRUST_STORE_FILE, SASL_PROPERTIES, true, false, 0, false, 0, false, 0, Option.empty(), 1, false, 1, (short) 1)), Time.SYSTEM));
        } else {
            if (this.brokersById.get(Integer.valueOf(i)).brokerState().currentState() == RunningAsBroker.state()) {
                log.debug("Broker with id {} is already running", Integer.valueOf(i));
                return;
            }
            log.debug("Restarting broker with id {} ...", Integer.valueOf(i));
            stopBroker(i);
            startBroker(i);
            log.debug("Broker with id {} was restarted", Integer.valueOf(i));
        }
    }

    public KafkaServer getBroker(int i) {
        return this.brokersById.get(Integer.valueOf(i));
    }

    public void stopCluster() {
        Iterator<Integer> it = this.brokersById.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            log.debug("Stopping broker with id {} ...", Integer.valueOf(intValue));
            stopBroker(intValue);
        }
        stopZookeeper();
    }

    private void stopBroker(int i) {
        if (this.brokersById.containsKey(Integer.valueOf(i))) {
            KafkaServer kafkaServer = this.brokersById.get(Integer.valueOf(i));
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
            CoreUtils.delete(kafkaServer.config().logDirs());
            this.brokersById.remove(Integer.valueOf(i));
        }
    }

    private void stopZookeeper() {
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
            this.zookeeper = null;
        }
    }

    public String zookeeperConnectString() {
        if (this.zookeeper != null) {
            return "localhost:" + this.zookeeper.port();
        }
        throw new IllegalStateException("ZooKeeper instance has not been started yet -- did you actually start the cluster?");
    }
}
