/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.test.cluster;

import io.confluent.common.EndPoint;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class EmbeddedKafka {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafka.class);
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final File logDir;
    private final Properties effectiveConfig;
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final KafkaConfig kafkaConfig;
    private KafkaServer kafka;
    private boolean isShutdown;

    private EmbeddedKafka(Properties config, Time time) {
        try {
            this.tmpFolder.create();
            this.logDir = this.tmpFolder.newFolder();
            this.effectiveConfig = this.brokerConfigs(config);
            this.kafkaConfig = new KafkaConfig((Map)this.effectiveConfig, true);
            log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.kafkaConfig.zkConnect());
            this.kafka = TestUtils.createServer((KafkaConfig)this.kafkaConfig, (Time)time);
            this.isShutdown = false;
            log.debug("Startup of embedded Kafka broker completed: {}", (Object)this);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Properties brokerConfigs(Properties overrideProps) {
        Properties brokerConfigs = new Properties();
        brokerConfigs.put(KafkaConfig.BrokerIdProp(), (Object)0);
        brokerConfigs.put(KafkaConfig.HostNameProp(), "127.0.0.1");
        brokerConfigs.put(KafkaConfig.PortProp(), "9092");
        brokerConfigs.put(KafkaConfig.NumPartitionsProp(), (Object)1);
        brokerConfigs.put(KafkaConfig.AutoCreateTopicsEnableProp(), (Object)true);
        brokerConfigs.put(KafkaConfig.MessageMaxBytesProp(), (Object)1000000);
        brokerConfigs.put(KafkaConfig.ControlledShutdownEnableProp(), (Object)true);
        brokerConfigs.putAll((Map<?, ?>)overrideProps);
        brokerConfigs.setProperty(KafkaConfig.LogDirProp(), this.logDir.getAbsolutePath());
        return brokerConfigs;
    }

    public synchronized String brokerConnect(String listenerName) {
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(new ListenerName(listenerName));
    }

    public synchronized String zkConnect() {
        return this.kafka.config().zkConnect();
    }

    public void shutdownAndCleanup() {
        this.shutdown();
        log.debug("Removing logs.dir at {} ...", (Object)this.logDir);
        List<String> logDirs = Collections.singletonList(this.logDir.getAbsolutePath());
        CoreUtils.delete((Seq)JavaConverters.asScalaBuffer(logDirs));
        this.tmpFolder.delete();
        log.debug("Shutdown and cleanup of embedded Kafka broker completed {}.", (Object)this);
    }

    public synchronized void shutdown() {
        if (this.isShutdown) {
            log.debug("Embedded Kafka broker {} was already shut down. Skipping shutdown", (Object)this);
            return;
        }
        log.debug("Shutting down embedded Kafka broker {} ...", (Object)this);
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        this.isShutdown = true;
        log.debug("Shutdown of embedded Kafka broker completed {}.", (Object)this);
        this.kafka = null;
    }

    public synchronized void startBroker(Time time) {
        if (this.kafka == null) {
            this.kafka = TestUtils.createServer((KafkaConfig)this.kafkaConfig, (Time)time);
            this.isShutdown = false;
            log.debug("Started embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.kafkaConfig.zkConnect());
        }
    }

    public void createTopic(String topic, int partitions, int replication, Properties topicConfig) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{topic, partitions, replication, topicConfig});
        try (KafkaZkClient kafkaZkClient = this.createZkClient();){
            AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
            adminZkClient.createTopic(topic, partitions, replication, topicConfig, (RackAwareMode)RackAwareMode.Enforced$.MODULE$, this.kafkaServer().config().usesModernTopicId(), this.kafkaServer().config().usesLegacyTopicId(), Option.empty());
        }
    }

    private KafkaZkClient createZkClient() {
        return KafkaZkClient.apply((String)this.zkConnect(), (boolean)false, (int)10000, (int)8000, (int)Integer.MAX_VALUE, (Time)Time.SYSTEM, (String)"testMetricGroup", (String)"testMetricType", (Option)Option.empty(), (Option)Option.empty());
    }

    public synchronized KafkaServer kafkaServer() {
        return this.kafka;
    }

    public synchronized EndPoint endPoint() {
        Object listenerConfig = this.effectiveConfig.get(KafkaConfig.InterBrokerListenerNameProp());
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        if (listenerConfig != null) {
            securityProtocol = SecurityProtocol.forName((String)listenerConfig.toString());
        }
        return new EndPoint(this.kafka.config().hostName(), this.kafka.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol)), securityProtocol);
    }

    public synchronized List<String> listeners() {
        return JavaConverters.seqAsJavaList((Seq)this.kafka.config().listeners()).stream().map(e -> e.listenerName().value()).collect(Collectors.toList());
    }

    public synchronized String toString() {
        Map endpoints = this.listeners().stream().collect(Collectors.toMap(Function.identity(), this::brokerConnect));
        return String.format("Kafka brokerId=%d, endpoints=%s, zkConnect=%s", this.kafka.config().brokerId(), Utils.mkString(endpoints, (String)"", (String)"", (String)":", (String)","), this.zkConnect());
    }

    public static class Builder {
        private final Properties config = new Properties();
        private final Time time;

        public Builder(Time time) {
            this.time = time;
        }

        public Builder addConfigs(Properties props) {
            this.config.putAll((Map<?, ?>)props);
            return this;
        }

        public EmbeddedKafka build() {
            return new EmbeddedKafka(this.config, this.time);
        }
    }
}

