/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.support.metrics.common.kafka;

import java.io.File;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;

public class EmbeddedKafkaCluster {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final Option<SecurityProtocol> INTER_BROKER_SECURITY_PROTOCOL = Option.apply((Object)SecurityProtocol.PLAINTEXT);
    private static final boolean ENABLE_CONTROLLED_SHUTDOWN = true;
    private static final boolean ENABLE_DELETE_TOPIC = false;
    private static final int BROKER_PORT = 0;
    private static final Option<File> TRUST_STORE_FILE = Option$.MODULE$.empty();
    private static final Option<Properties> SASL_PROPERTIES = Option$.MODULE$.empty();
    private static final boolean ENABLE_PLAINTEXT = true;
    private static final boolean ENABLE_SASL_PLAINTEXT = false;
    private static final int SASL_PLAINTEXT_PORT = 0;
    private static final boolean ENABLE_SSL = false;
    private static final int SSL_PORT = 0;
    private static final boolean ENABLE_SASL_SSL = false;
    private static final int SASL_SSL_PORT = 0;
    private static final int LOG_DIR_COUNT = 1;
    private static final int NUM_PARTITIONS = 1;
    private static final short DEFAULT_REPLICATION_FACTOR = 1;
    private EmbeddedZookeeper zookeeper = null;
    private final Map<Integer, KafkaServer> brokersById = new ConcurrentHashMap<Integer, KafkaServer>();

    public void startCluster(int numBrokers) {
        if (numBrokers <= 0) {
            throw new IllegalArgumentException("number of brokers must be >= 1");
        }
        this.startZookeeperIfNeeded();
        for (int brokerId = 0; brokerId < numBrokers; ++brokerId) {
            log.debug("Starting broker with id {} ...", (Object)brokerId);
            this.startBroker(brokerId);
        }
    }

    private void startZookeeperIfNeeded() {
        if (this.zookeeper == null) {
            this.zookeeper = new EmbeddedZookeeper();
        }
    }

    private void startBroker(int brokerId) {
        if (brokerId < 0) {
            throw new IllegalArgumentException("broker id must not be negative");
        }
        if (!this.brokersById.containsKey(brokerId)) {
            Properties props = TestUtils.createBrokerConfig((int)brokerId, (String)this.zookeeperConnectString(), (boolean)true, (boolean)false, (int)0, INTER_BROKER_SECURITY_PROTOCOL, TRUST_STORE_FILE, SASL_PROPERTIES, (boolean)true, (boolean)false, (int)0, (boolean)false, (int)0, (boolean)false, (int)0, (Option)Option.empty(), (int)1, (boolean)false, (int)1, (short)1);
            KafkaServer broker = TestUtils.createServer((KafkaConfig)KafkaConfig.fromProps((Properties)props), (Time)Time.SYSTEM);
            this.brokersById.put(brokerId, broker);
        } else {
            KafkaServer broker = this.brokersById.get(brokerId);
            if (broker.brokerState().currentState() == RunningAsBroker.state()) {
                log.debug("Broker with id {} is already running", (Object)brokerId);
            } else {
                log.debug("Restarting broker with id {} ...", (Object)brokerId);
                this.stopBroker(brokerId);
                this.startBroker(brokerId);
                log.debug("Broker with id {} was restarted", (Object)brokerId);
            }
        }
    }

    public KafkaServer getBroker(int brokerId) {
        return this.brokersById.get(brokerId);
    }

    public void stopCluster() {
        for (int brokerId : this.brokersById.keySet()) {
            log.debug("Stopping broker with id {} ...", (Object)brokerId);
            this.stopBroker(brokerId);
        }
        this.stopZookeeper();
    }

    private void stopBroker(int brokerId) {
        if (this.brokersById.containsKey(brokerId)) {
            KafkaServer broker = this.brokersById.get(brokerId);
            broker.shutdown();
            broker.awaitShutdown();
            CoreUtils.delete((Seq)broker.config().logDirs());
            this.brokersById.remove(brokerId);
        }
    }

    private void stopZookeeper() {
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
            this.zookeeper = null;
        }
    }

    public String zookeeperConnectString() {
        if (this.zookeeper != null) {
            return "localhost:" + this.zookeeper.port();
        }
        throw new IllegalStateException("ZooKeeper instance has not been started yet -- did you actually start the cluster?");
    }
}

