/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class KafkaEmbedded {
    private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private final Properties effectiveConfig;
    private final File logDir;
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final KafkaServer kafka;

    public KafkaEmbedded(Properties config, MockTime time) throws IOException {
        this.tmpFolder.create();
        this.logDir = this.tmpFolder.newFolder();
        this.effectiveConfig = this.effectiveConfigFrom(config);
        boolean loggingEnabled = true;
        KafkaConfig kafkaConfig = new KafkaConfig((Map)this.effectiveConfig, true);
        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.zookeeperConnect());
        this.kafka = TestUtils.createServer((KafkaConfig)kafkaConfig, (Time)time);
        log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
    }

    private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
        Properties effectiveConfig = new Properties();
        effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), (Object)0);
        effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
        effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
        effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), (Object)1);
        effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), (Object)true);
        effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), (Object)1000000);
        effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), (Object)true);
        effectiveConfig.putAll((Map<?, ?>)initialConfig);
        effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath());
        return effectiveConfig;
    }

    public String brokerList() {
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT));
    }

    public String zookeeperConnect() {
        return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
    }

    public void stop() {
        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        log.debug("Removing logs.dir at {} ...", (Object)this.logDir);
        List<String> logDirs = Collections.singletonList(this.logDir.getAbsolutePath());
        CoreUtils.delete((Seq)JavaConversions.asScalaBuffer(logDirs).seq());
        this.tmpFolder.delete();
        log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
    }

    public void createTopic(String topic) {
        this.createTopic(topic, 1, 1, new Properties());
    }

    public void createTopic(String topic, int partitions, int replication) {
        this.createTopic(topic, partitions, replication, new Properties());
    }

    public void createTopic(String topic, int partitions, int replication, Properties topicConfig) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{topic, partitions, replication, topicConfig});
        ZkClient zkClient = new ZkClient(this.zookeeperConnect(), 10000, 8000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        boolean isSecure = false;
        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(this.zookeeperConnect()), false);
        AdminUtils.createTopic((ZkUtils)zkUtils, (String)topic, (int)partitions, (int)replication, (Properties)topicConfig, (RackAwareMode)RackAwareMode.Enforced$.MODULE$);
        zkClient.close();
    }

    public void deleteTopic(String topic) {
        log.debug("Deleting topic { name: {} }", (Object)topic);
        ZkClient zkClient = new ZkClient(this.zookeeperConnect(), 10000, 8000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        boolean isSecure = false;
        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(this.zookeeperConnect()), false);
        AdminUtils.deleteTopic((ZkUtils)zkUtils, (String)topic);
        zkClient.close();
    }

    public KafkaServer kafkaServer() {
        return this.kafka;
    }
}

