/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.integration.utils.KafkaEmbedded;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedKafkaCluster
extends ExternalResource {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    public static final int TOPIC_CREATION_TIMEOUT = 30000;
    private EmbeddedZookeeper zookeeper = null;
    private final KafkaEmbedded[] brokers;
    private final Properties brokerConfig;
    public final MockTime time = new MockTime();

    public EmbeddedKafkaCluster(int numBrokers) {
        this(numBrokers, new Properties());
    }

    public EmbeddedKafkaCluster(int numBrokers, Properties brokerConfig) {
        this.brokers = new KafkaEmbedded[numBrokers];
        this.brokerConfig = brokerConfig;
    }

    public void start() throws IOException, InterruptedException {
        log.debug("Initiating embedded Kafka cluster startup");
        log.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        log.debug("ZooKeeper instance is running at {}", (Object)this.zKConnectString());
        this.brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), this.zKConnectString());
        this.brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), (Object)0);
        this.putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
        this.putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 0x200000L);
        this.putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
        this.putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
        for (int i = 0; i < this.brokers.length; ++i) {
            this.brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), (Object)i);
            log.debug("Starting a Kafka instance on port {} ...", (Object)this.brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
            this.brokers[i] = new KafkaEmbedded(this.brokerConfig, this.time);
            log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", (Object)this.brokers[i].brokerList(), (Object)this.brokers[i].zookeeperConnect());
        }
    }

    private void putIfAbsent(Properties props, String propertyKey, Object propertyValue) {
        if (!props.containsKey(propertyKey)) {
            this.brokerConfig.put(propertyKey, propertyValue);
        }
    }

    public void stop() {
        for (KafkaEmbedded broker : this.brokers) {
            broker.stop();
        }
        this.zookeeper.shutdown();
    }

    public String zKConnectString() {
        return "localhost:" + this.zookeeper.port();
    }

    public String bootstrapServers() {
        return this.brokers[0].brokerList();
    }

    protected void before() throws Throwable {
        this.start();
    }

    protected void after() {
        this.stop();
    }

    public void createTopic(String topic) throws InterruptedException {
        this.createTopic(topic, 1, 1, new Properties());
    }

    public void createTopic(String topic, int partitions, int replication) throws InterruptedException {
        this.createTopic(topic, partitions, replication, new Properties());
    }

    public void createTopic(String topic, int partitions, int replication, Properties topicConfig) throws InterruptedException {
        this.brokers[0].createTopic(topic, partitions, replication, topicConfig);
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (int partition = 0; partition < partitions; ++partition) {
            topicPartitions.add(new TopicPartition(topic, partition));
        }
        IntegrationTestUtils.waitForTopicPartitions(this.brokers(), topicPartitions, 30000L);
    }

    public void deleteTopic(String topic) {
        this.brokers[0].deleteTopic(topic);
    }

    public List<KafkaServer> brokers() {
        ArrayList<KafkaServer> servers = new ArrayList<KafkaServer>();
        for (KafkaEmbedded broker : this.brokers) {
            servers.add(broker.kafkaServer());
        }
        return servers;
    }
}

