package org.apache.drill.exec.store.kafka.cluster;

import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
import org.apache.drill.exec.store.kafka.TestQueryConstants;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster implements TestQueryConstants {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private List<KafkaServer> brokers;
    private ZookeeperHelper zkHelper;
    private KafkaAsyncCloser closer;
    private final Properties props;

    public EmbeddedKafkaCluster() throws IOException {
        this(new Properties());
    }

    public EmbeddedKafkaCluster(Properties properties) throws IOException {
        this(properties, 1);
    }

    public EmbeddedKafkaCluster(Properties properties, int i) throws IOException {
        this.props = new Properties();
        this.props.putAll(properties);
        this.zkHelper = new ZookeeperHelper();
        this.zkHelper.startZookeeper(1);
        this.brokers = new ArrayList(i);
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < i; i2++) {
            if (i2 != 0) {
                sb.append(TestQueryConstants.BROKER_DELIM);
            }
            int ephemeralPort = getEphemeralPort();
            sb.append(TestQueryConstants.LOCAL_HOST).append(":").append(ephemeralPort);
            addBroker(this.props, i2, ephemeralPort);
        }
        this.props.put("metadata.broker.list", sb.toString());
        this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        logger.info("Initialized Kafka Server");
        this.closer = new KafkaAsyncCloser();
    }

    private void addBroker(Properties properties, int i, int i2) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
        properties2.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
        properties2.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
        properties2.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
        properties2.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
        properties2.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
        properties2.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
        properties2.put(KafkaConfig.BrokerIdProp(), String.valueOf(i + 1));
        properties2.put(KafkaConfig.HostNameProp(), TestQueryConstants.LOCAL_HOST);
        properties2.put(KafkaConfig.AdvertisedHostNameProp(), TestQueryConstants.LOCAL_HOST);
        properties2.put(KafkaConfig.PortProp(), String.valueOf(i2));
        properties2.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(i2));
        properties2.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
        properties2.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
        properties2.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
        this.brokers.add(getBroker(properties2));
    }

    private static KafkaServer getBroker(Properties properties) {
        KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(properties), Time.SYSTEM, Option.apply("kafka"), false);
        kafkaServer.startup();
        return kafkaServer;
    }

    public void shutDownCluster() {
        this.closer.close();
        this.closer = null;
        if (this.brokers != null) {
            this.brokers.forEach((v0) -> {
                v0.shutdown();
            });
            this.brokers = null;
        }
        if (this.zkHelper != null) {
            this.zkHelper.stopZookeeper();
            this.zkHelper = null;
        }
    }

    public void shutDownBroker(int i) {
        this.brokers.stream().filter(kafkaServer -> {
            return Integer.parseInt(kafkaServer.config().getString(KafkaConfig.BrokerIdProp())) == i;
        }).findAny().ifPresent((v0) -> {
            v0.shutdown();
        });
    }

    public Properties getProps() {
        Properties properties = new Properties();
        properties.putAll(this.props);
        return properties;
    }

    public List<KafkaServer> getBrokers() {
        return this.brokers;
    }

    public void setBrokers(List<KafkaServer> list) {
        this.brokers = list;
    }

    public ZookeeperHelper getZkServer() {
        return this.zkHelper;
    }

    public String getKafkaBrokerList() {
        return (String) this.brokers.stream().map((v0) -> {
            return v0.config();
        }).map(kafkaConfig -> {
            return kafkaConfig.hostName() + ":" + kafkaConfig.port();
        }).collect(Collectors.joining(TestQueryConstants.BROKER_DELIM));
    }

    public void registerToClose(AutoCloseable autoCloseable) {
        this.closer.close(autoCloseable);
    }

    private int getEphemeralPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            int localPort = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    serverSocket.close();
                }
            }
            return localPort;
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    private File getTemporaryDir() {
        File file = new File(System.getProperty("java.io.tmpdir"), TestQueryConstants.ZK_TMP + System.nanoTime());
        if (file.mkdir()) {
            return file;
        }
        logger.error("Failed to create temp Dir");
        throw new RuntimeException("Failed to create temp Dir");
    }
}
