/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.provision;

import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.provision.ZKStringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MockKafka {
    private KafkaServerStartable kafkaServer;
    private static final Logger logger = LoggerFactory.getLogger(MockKafka.class);
    private ZkConnection zkConnection;

    private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String server, String brokerId) {
        Properties properties = new Properties();
        properties.put("broker.id", brokerId);
        properties.put("log.dirs", logDir);
        properties.put("offsets.topic.replication.factor", "1");
        properties.put("delete.topic.enable", "true");
        properties.put("zookeeper.connect", zkServerConnection.getServers());
        properties.put("listeners", "PLAINTEXT://" + server);
        properties.put("advertised.listeners", "PLAINTEXT://" + server);
        return properties;
    }

    public MockKafka(ZkConnection zkServerConnection) {
        this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + RandomUtil.randomUUID().toString(), "localhost:9092", "1");
        this.start();
    }

    private MockKafka(Properties properties) {
        KafkaConfig kafkaConfig = new KafkaConfig((Map)properties);
        this.kafkaServer = new KafkaServerStartable(kafkaConfig);
    }

    public MockKafka(ZkConnection zkServerConnection, String server, int brokerId) {
        this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + RandomUtil.randomUUID().toString(), server, String.valueOf(brokerId));
    }

    private MockKafka(ZkConnection zkServerConnection, String logDir, String server, String brokerId) {
        this(MockKafka.createProperties(zkServerConnection, logDir, server, brokerId));
        this.zkConnection = zkServerConnection;
        System.out.println(String.format(Locale.ROOT, "Kafka %s:%s dir:%s", this.kafkaServer.serverConfig().brokerId(), this.kafkaServer.serverConfig().port(), this.kafkaServer.serverConfig().logDirs()));
    }

    public void createTopic(String topic, int partition, int replication) {
        ZkClient zkClient = new ZkClient((IZkConnection)this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer((ZkSerializer)new ZKStringSerializer());
        AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null);
        zkClient.close();
    }

    public void createTopic(String topic) {
        this.createTopic(topic, 1, 1);
    }

    public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) {
        ZkClient zkClient = new ZkClient((IZkConnection)this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer((ZkSerializer)new ZKStringSerializer());
        MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
        zkClient.close();
        return topicMetadata;
    }

    public void deleteTopic(String topic) {
        ZkClient zkClient = new ZkClient((IZkConnection)this.zkConnection);
        ZkUtils zkUtils = new ZkUtils(zkClient, this.zkConnection, false);
        zkClient.setZkSerializer((ZkSerializer)new ZKStringSerializer());
        AdminUtils.deleteTopic(zkUtils, topic);
        zkClient.close();
    }

    public void start() {
        this.kafkaServer.startup();
        System.out.println("--embedded kafka is up");
    }

    public void stop() {
        this.kafkaServer.shutdown();
        System.out.println("embedded kafka down");
    }

    public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) {
        boolean isReady = false;
        MetadataResponse.TopicMetadata topicMeta = null;
        while (!isReady) {
            Random random = new Random();
            topicMeta = this.fetchTopicMeta(topic);
            List partitionsMetadata = topicMeta.partitionMetadata();
            Iterator iterator = partitionsMetadata.iterator();
            boolean hasGotLeader = true;
            boolean hasGotReplica = true;
            while (iterator.hasNext()) {
                MetadataResponse.PartitionMetadata partitionMeta = (MetadataResponse.PartitionMetadata)iterator.next();
                hasGotLeader &= !partitionMeta.leader().isEmpty();
                if (partitionMeta.leader().isEmpty()) {
                    System.out.println("Partition leader is not ready, wait 1s.");
                    break;
                }
                hasGotReplica &= !partitionMeta.replicas().isEmpty();
                if (!partitionMeta.replicas().isEmpty()) continue;
                System.out.println("Partition replica is not ready, wait 1s.");
                break;
            }
            if (isReady = hasGotLeader & hasGotReplica) continue;
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return topicMeta;
    }

    public String getZookeeperConnection() {
        return this.zkConnection.getServers();
    }
}

