/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.test.cluster;

import integration.kafka.server.TestDataBalancer;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.api.Request;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class EmbeddedKafkaCluster {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final int DEFAULT_BROKER_PORT = 0;
    private final Time time;
    private final List<EmbeddedKafka> brokers;
    private EmbeddedZookeeper zookeeper;

    public EmbeddedKafkaCluster() {
        this((Time)new MockTime(System.currentTimeMillis(), System.nanoTime()));
    }

    public EmbeddedKafkaCluster(Time time) {
        this.time = time;
        this.brokers = new ArrayList<EmbeddedKafka>();
    }

    public void startZooKeeper() {
        log.debug("Starting a ZooKeeper instance");
        this.zookeeper = new EmbeddedZookeeper();
        log.debug("ZooKeeper instance is running at {}", (Object)this.zkConnect());
    }

    public void startBrokers(int numBrokers, Properties overrideProps) {
        log.debug("Initiating embedded Kafka cluster startup with config {}", (Object)overrideProps);
        int brokerIdStart = Integer.parseInt(overrideProps.getOrDefault((Object)KafkaConfig.BrokerIdProp(), "0").toString());
        for (int i = 0; i < numBrokers; ++i) {
            this.startBroker(brokerIdStart + i, overrideProps);
        }
    }

    public void startBroker(int brokerId, Properties overrideProps) {
        Properties brokerConfig = this.createBrokerConfig(brokerId, overrideProps);
        log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig.PortProp()));
        EmbeddedKafka broker = new EmbeddedKafka.Builder(this.time).addConfigs(brokerConfig).build();
        this.brokers.add(broker);
        log.debug("Kafka instance started: {}", (Object)broker);
    }

    public Properties createBrokerConfig(int brokerId, Properties overrideProps) {
        log.debug("Initiating embedded Kafka cluster startup with config {}", (Object)overrideProps);
        Properties brokerConfig = new Properties();
        brokerConfig.put(KafkaConfig.ZkConnectProp(), this.zkConnect());
        brokerConfig.put(KafkaConfig.PortProp(), (Object)0);
        this.putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short)1);
        this.putIfAbsent(brokerConfig, "confluent.license.topic.replication.factor", (short)1);
        this.putIfAbsent(brokerConfig, "confluent.security.event.logger.exporter.kafka.topic.replicas", (short)1);
        brokerConfig.putAll((Map<?, ?>)overrideProps);
        this.putIfAbsent(brokerConfig, KafkaConfig.FailedAuthenticationDelayMsProp(), 0);
        this.putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
        this.putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicPartitionsProp(), 5);
        this.putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), true);
        brokerConfig.put(KafkaConfig.BrokerIdProp(), (Object)brokerId);
        this.putIfAbsent(brokerConfig, "confluent.balancer.class", TestDataBalancer.class.getCanonicalName());
        this.putIfAbsent(brokerConfig, "confluent.metadata.server.listeners", "");
        return brokerConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void concurrentStartBrokers(List<Properties> brokerConfigs, Duration timeout) throws Exception {
        int numBrokers = brokerConfigs.size();
        ArrayList<Future<EmbeddedKafka>> brokerFutures = new ArrayList<Future<EmbeddedKafka>>(numBrokers);
        ExecutorService executorService = Executors.newFixedThreadPool(numBrokers);
        try {
            for (Properties brokerConfig : brokerConfigs) {
                brokerFutures.add(executorService.submit(() -> {
                    log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig.PortProp()));
                    return new EmbeddedKafka.Builder(this.time).addConfigs(brokerConfig).build();
                }));
            }
            AtomicReference<Exception> firstException = new AtomicReference<Exception>();
            for (Future future : brokerFutures) {
                try {
                    EmbeddedKafka broker = (EmbeddedKafka)future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
                    this.brokers.add(broker);
                    log.debug("Kafka instance started: {}", (Object)broker);
                }
                catch (Exception t) {
                    firstException.compareAndSet(null, t);
                }
            }
            if (firstException.get() != null) {
                throw (Exception)firstException.get();
            }
        }
        finally {
            executorService.shutdownNow();
            executorService.awaitTermination(30L, TimeUnit.SECONDS);
        }
    }

    private void putIfAbsent(Properties brokerConfig, String propertyKey, Object propertyValue) {
        if (!brokerConfig.containsKey(propertyKey)) {
            brokerConfig.put(propertyKey, propertyValue);
        }
    }

    public void shutdownBrokers() {
        for (EmbeddedKafka broker : this.brokers) {
            if (broker == null) continue;
            broker.shutdown();
        }
    }

    public void startBrokersAfterShutdown() {
        for (EmbeddedKafka broker : this.brokers) {
            if (broker == null) continue;
            broker.startBroker(this.time);
        }
    }

    public void shutdown() {
        for (EmbeddedKafka broker : this.brokers) {
            if (broker == null) continue;
            broker.shutdownAndCleanup();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void concurrentShutdown() {
        try {
            int numBrokers = this.brokers.size();
            if (numBrokers <= 1) {
                this.shutdownBrokers();
                return;
            }
            ExecutorService executorService = Executors.newFixedThreadPool(numBrokers);
            ArrayList<Future<Object>> shutdownFutures = new ArrayList<Future<Object>>(numBrokers);
            try {
                for (EmbeddedKafka embeddedKafka : this.brokers) {
                    if (embeddedKafka == null) continue;
                    shutdownFutures.add(executorService.submit(embeddedKafka::shutdownAndCleanup, null));
                }
                for (Future future : shutdownFutures) {
                    future.get(60L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                executorService.shutdownNow();
            }
        }
        finally {
            if (this.zookeeper != null) {
                this.zookeeper.shutdown();
            }
        }
    }

    public String zkConnect() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public String bootstrapServers(String listener) {
        return this.brokers.stream().map(broker -> broker.brokerConnect(listener)).collect(Collectors.joining(","));
    }

    public String bootstrapServers() {
        List<String> listeners = this.brokers.get(0).listeners();
        if (listeners.size() > 2) {
            throw new IllegalStateException("Listener name not specified for listeners " + listeners);
        }
        String listener = listeners.get(0);
        if (listeners.size() > 1 && this.brokers.get(0).kafkaServer().config().interBrokerListenerName().value().equals(listener)) {
            listener = listeners.get(1);
        }
        return this.bootstrapServers(listener);
    }

    public void createTopic(String topic, int partitions, int replication) {
        this.brokers.get(0).createTopic(topic, partitions, replication, new Properties());
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (int partition = 0; partition < partitions; ++partition) {
            topicPartitions.add(new TopicPartition(topic, partition));
        }
        EmbeddedKafkaCluster.waitForTopicPartitions(this.brokers(), topicPartitions);
    }

    private static void waitForTopicPartitions(List<KafkaServer> servers, List<TopicPartition> partitions) {
        partitions.forEach(partition -> EmbeddedKafkaCluster.waitUntilMetadataIsPropagated(servers, partition));
    }

    private static void waitUntilMetadataIsPropagated(List<KafkaServer> servers, TopicPartition tp) {
        try {
            String topic = tp.topic();
            int partition = tp.partition();
            TestUtils.waitForCondition(() -> servers.stream().map(server -> ((KafkaApis)server.dataPlaneRequestHandlerPool().apis()).metadataCache()).allMatch(metadataCache -> {
                Option partState = metadataCache.getPartitionInfo(topic, partition);
                if (partState.isEmpty()) {
                    return false;
                }
                return Request.isValidBrokerId((int)((UpdateMetadataRequestData.UpdateMetadataPartitionState)partState.get()).leader());
            }), (String)("Metadata for topic=" + topic + " partition=" + partition + " not propagated"));
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    public List<EmbeddedKafka> kafkas() {
        return Collections.unmodifiableList(this.brokers);
    }

    public List<KafkaServer> brokers() {
        return this.brokers.stream().map(EmbeddedKafka::kafkaServer).collect(Collectors.toList());
    }

    public void produceData(String topic, int numMessages) {
        KafkaProducer<String, String> producer = KafkaTestUtils.createProducer(this.bootstrapServers(), SecurityProtocol.PLAINTEXT, "PLAIN", "");
        List messages = IntStream.range(1, numMessages).asLongStream().mapToObj(num -> String.format("test-%d", num)).collect(Collectors.toList());
        for (String message : messages) {
            producer.send(new ProducerRecord(topic, (Object)topic, (Object)message));
        }
        producer.flush();
        producer.close();
    }

    public void produceApiKeysData(String topic, String apiKeys) {
        try (KafkaProducer<String, String> producer = KafkaTestUtils.createProducer(this.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "PLAIN", "");){
            RecordHeaders headers = this.createHeaders(1L);
            producer.send(new ProducerRecord(topic, Integer.valueOf(0), (Object)topic, (Object)apiKeys, (Iterable)headers));
            producer.flush();
        }
    }

    private RecordHeaders createHeaders(long seqId) {
        Header[] headerArr = new Header[]{new RecordHeader("_sequence_id", this.longToBytes(seqId))};
        return new RecordHeaders(headerArr);
    }

    private byte[] longToBytes(long x) {
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(x);
        return buffer.array();
    }
}

