package org.apache.pinot.plugin.stream.kafka20.server;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/server/KafkaDataServerStartable.class */
public class KafkaDataServerStartable implements StreamDataServerStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDataServerStartable.class);
    private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
    private static final String LOG_DIRS = "log.dirs";
    private static final String PORT = "port";
    private KafkaServerStartable _serverStartable;
    private int _port;
    private String _zkStr;
    private String _logDirPath;
    private AdminClient _adminClient;

    public void init(Properties properties) {
        this._port = ((Integer) properties.get(PORT)).intValue();
        this._zkStr = properties.getProperty(ZOOKEEPER_CONNECT);
        this._logDirPath = properties.getProperty(LOG_DIRS);
        int indexOf = this._zkStr.indexOf(47);
        if (indexOf != -1) {
            String substring = this._zkStr.substring(0, indexOf);
            String substring2 = this._zkStr.substring(indexOf);
            ZkClient zkClient = new ZkClient(substring);
            zkClient.createPersistent(substring2, true);
            zkClient.close();
        }
        new File(this._logDirPath).mkdirs();
        properties.put("zookeeper.session.timeout.ms", "60000");
        this._serverStartable = new KafkaServerStartable(new KafkaConfig(properties));
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:" + this._port);
        hashMap.put("client.id", "Kafka2AdminClient-" + UUID.randomUUID().toString());
        hashMap.put("request.timeout.ms", 15000);
        this._adminClient = KafkaAdminClient.create(hashMap);
    }

    public void start() {
        this._serverStartable.startup();
    }

    public void stop() {
        this._serverStartable.shutdown();
        FileUtils.deleteQuietly(new File((String) this._serverStartable.staticServerConfig().logDirs().apply(0)));
    }

    public void createTopic(String str, Properties properties) {
        this._adminClient.createTopics(Arrays.asList(new NewTopic(str, ((Integer) properties.get("partition")).intValue(), (short) 1)));
    }

    public int getPort() {
        return this._port;
    }
}
