package org.apache.kylin.provision;

import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.requests.MetadataResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/provision/MockKafka.class */
public class MockKafka {
    private KafkaServerStartable kafkaServer;
    private static final Logger logger = LoggerFactory.getLogger(MockKafka.class);
    private ZkConnection zkConnection;

    private static Properties createProperties(ZkConnection zkConnection, String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.put("port", str2);
        properties.put("broker.id", str3);
        properties.put("log.dirs", str);
        properties.put("host.name", "localhost");
        properties.put("offsets.topic.replication.factor", "1");
        properties.put("delete.topic.enable", "true");
        properties.put("zookeeper.connect", zkConnection.getServers());
        String localIp = NetworkUtils.getLocalIp();
        properties.put("listeners", "PLAINTEXT://" + localIp + ":" + str2);
        properties.put("advertised.listeners", "PLAINTEXT://" + localIp + ":" + str2);
        return properties;
    }

    public MockKafka(ZkConnection zkConnection) {
        this(zkConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1");
        start();
    }

    private MockKafka(Properties properties) {
        this.kafkaServer = new KafkaServerStartable(new KafkaConfig(properties));
    }

    public MockKafka(ZkConnection zkConnection, int i, int i2) {
        this(zkConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(i), String.valueOf(i2));
    }

    private MockKafka(ZkConnection zkConnection, String str, String str2, String str3) {
        this(createProperties(zkConnection, str, str2, str3));
        this.zkConnection = zkConnection;
        System.out.println(String.format("Kafka %s:%s dir:%s", Integer.valueOf(this.kafkaServer.serverConfig().brokerId()), this.kafkaServer.serverConfig().port(), this.kafkaServer.serverConfig().logDirs()));
    }

    public void createTopic(String str, int i, int i2) {
        ZkClient zkClient = new ZkClient(this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer(new ZKStringSerializer());
        AdminUtils.createTopic(zkUtils, str, i, i2, new Properties(), (RackAwareMode) null);
        zkClient.close();
    }

    public void createTopic(String str) {
        createTopic(str, 1, 1);
    }

    public MetadataResponse.TopicMetadata fetchTopicMeta(String str) {
        ZkClient zkClient = new ZkClient(this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer(new ZKStringSerializer());
        MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils.fetchTopicMetadataFromZk(str, zkUtils);
        zkClient.close();
        return fetchTopicMetadataFromZk;
    }

    public void deleteTopic(String str) {
        ZkClient zkClient = new ZkClient(this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer(new ZKStringSerializer());
        AdminUtils.deleteTopic(zkUtils, str);
        zkClient.close();
    }

    public void start() {
        this.kafkaServer.startup();
        System.out.println("--embedded kafka is up");
    }

    public void stop() {
        this.kafkaServer.shutdown();
        System.out.println("embedded kafka down");
    }

    public MetadataResponse.TopicMetadata waitTopicUntilReady(String str) {
        boolean z = false;
        MetadataResponse.TopicMetadata topicMetadata = null;
        while (!z) {
            new Random();
            topicMetadata = fetchTopicMeta(str);
            Iterator it = topicMetadata.partitionMetadata().iterator();
            boolean z2 = true;
            boolean z3 = true;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MetadataResponse.PartitionMetadata partitionMetadata = (MetadataResponse.PartitionMetadata) it.next();
                z2 &= !partitionMetadata.leader().isEmpty();
                if (partitionMetadata.leader().isEmpty()) {
                    System.out.println("Partition leader is not ready, wait 1s.");
                    break;
                }
                z3 &= !partitionMetadata.replicas().isEmpty();
                if (partitionMetadata.replicas().isEmpty()) {
                    System.out.println("Partition replica is not ready, wait 1s.");
                    break;
                }
            }
            z = z2 & z3;
            if (!z) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        return topicMetadata;
    }

    public String getZookeeperConnection() {
        return this.zkConnection.getServers();
    }
}
