package co.cask.cdap.kafka.run;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.runtime.DaemonMain;
import co.cask.cdap.common.utils.Networks;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.Service;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/run/KafkaServerMain.class */
public class KafkaServerMain extends DaemonMain {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaServerMain.class);
    private Properties kafkaProperties;
    private EmbeddedKafkaServer kafkaServer;

    public static void main(String[] strArr) throws Exception {
        new KafkaServerMain().doMain(strArr);
    }

    public void init(String[] strArr) {
        CConfiguration create = CConfiguration.create();
        String str = create.get("zookeeper.quorum");
        String str2 = create.get("kafka.zookeeper.namespace");
        if (str2 != null) {
            ZKClientService build = ZKClientService.Builder.of(str).build();
            try {
                try {
                    build.startAndWait();
                    String str3 = "/" + str2;
                    LOG.info(String.format("Creating zookeeper namespace %s", str3));
                    ZKOperations.ignoreError(build.create(str3, (byte[]) null, CreateMode.PERSISTENT), KeeperException.NodeExistsException.class, str3).get();
                    build.stopAndWait();
                    str = String.format("%s/%s", str, str2);
                    build.stopAndWait();
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            } catch (Throwable th) {
                build.stopAndWait();
                throw th;
            }
        }
        this.kafkaProperties = generateKafkaConfig(create);
        Preconditions.checkState(Integer.parseInt(this.kafkaProperties.getProperty("num.partitions"), 10) > 0, "Num partitions should be greater than zero.");
        Preconditions.checkState(Integer.parseInt(this.kafkaProperties.getProperty("port"), 10) > 0, "Port number is invalid.");
        String property = this.kafkaProperties.getProperty("host.name");
        InetAddress resolve = Networks.resolve(property, new InetSocketAddress("localhost", 0).getAddress());
        if (property != null) {
            if (resolve.isAnyLocalAddress()) {
                this.kafkaProperties.remove("host.name");
                try {
                    resolve = InetAddress.getLocalHost();
                } catch (UnknownHostException e2) {
                    throw Throwables.propagate(e2);
                }
            } else {
                this.kafkaProperties.setProperty("host.name", resolve.getCanonicalHostName());
            }
        }
        if (this.kafkaProperties.getProperty("broker.id") == null) {
            int generateBrokerId = generateBrokerId(resolve);
            LOG.info(String.format("Initializing server with broker id %d", Integer.valueOf(generateBrokerId)));
            this.kafkaProperties.setProperty("broker.id", Integer.toString(generateBrokerId));
        }
        if (this.kafkaProperties.getProperty("zookeeper.connect") == null) {
            this.kafkaProperties.setProperty("zookeeper.connect", str);
        }
    }

    public void start() {
        LOG.info("Starting embedded kafka server...");
        this.kafkaServer = new EmbeddedKafkaServer(this.kafkaProperties);
        if (this.kafkaServer.startAndWait() != Service.State.RUNNING) {
            throw new IllegalStateException("Kafka server has not started... terminating.");
        }
        LOG.info("Embedded kafka server started successfully.");
    }

    public void stop() {
        LOG.info("Stopping embedded kafka server...");
        if (this.kafkaServer == null || !this.kafkaServer.isRunning()) {
            return;
        }
        this.kafkaServer.stopAndWait();
    }

    public void destroy() {
    }

    private Properties generateKafkaConfig(CConfiguration cConfiguration) {
        Properties properties = new Properties();
        Iterator it = cConfiguration.getValByRegex("^(kafka\\.server\\.)").entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            properties.setProperty(str.substring(13), cConfiguration.get(str));
        }
        return properties;
    }

    private static int generateBrokerId(InetAddress inetAddress) {
        LOG.info("Generating broker ID with address {}", inetAddress);
        try {
            return Math.abs(InetAddresses.coerceToInteger(inetAddress));
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }
}
