package org.apache.kafka.connect.util.clusters;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster extends ExternalResource {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120);
    private final KafkaServer[] brokers;
    private final Properties brokerConfig;
    private final int[] currentBrokerPorts;
    private final String[] currentBrokerLogDirs;
    private KafkaProducer<byte[], byte[]> producer;
    private final Time time = new MockTime();
    private EmbeddedZookeeper zookeeper = null;
    private ListenerName listenerName = new ListenerName("PLAINTEXT");

    public EmbeddedKafkaCluster(int i, Properties properties) {
        this.brokers = new KafkaServer[i];
        this.currentBrokerPorts = new int[i];
        this.currentBrokerLogDirs = new String[i];
        this.brokerConfig = properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void before() {
        start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void after() {
        stop();
    }

    public void startOnlyKafkaOnSamePorts() {
        start(this.currentBrokerPorts, this.currentBrokerLogDirs);
    }

    private void start() {
        this.zookeeper = new EmbeddedZookeeper();
        Arrays.fill(this.currentBrokerPorts, 0);
        Arrays.fill(this.currentBrokerLogDirs, (Object) null);
        start(this.currentBrokerPorts, this.currentBrokerLogDirs);
    }

    private void start(int[] iArr, String[] strArr) {
        this.brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), (Object) "localhost");
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), (Object) true);
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), (Object) 0);
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (Object) Short.valueOf((short) this.brokers.length));
        putIfAbsent(this.brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), (Object) false);
        Object obj = this.brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
        if (obj != null) {
            this.listenerName = new ListenerName(obj.toString());
        }
        for (int i = 0; i < this.brokers.length; i++) {
            this.brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.valueOf(i));
            this.currentBrokerLogDirs[i] = strArr[i] == null ? createLogDir() : this.currentBrokerLogDirs[i];
            this.brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), this.currentBrokerLogDirs[i]);
            this.brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), Integer.valueOf(iArr[i]));
            this.brokers[i] = TestUtils.createServer(new KafkaConfig(this.brokerConfig, true), this.time);
            this.currentBrokerPorts[i] = this.brokers[i].boundPort(this.listenerName);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", bootstrapServers());
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producer = new KafkaProducer<>(hashMap);
    }

    public void stopOnlyKafka() {
        stop(false, false);
    }

    private void stop() {
        stop(true, true);
    }

    private void stop(boolean z, boolean z2) {
        try {
            this.producer.close();
            for (KafkaServer kafkaServer : this.brokers) {
                try {
                    kafkaServer.shutdown();
                } catch (Throwable th) {
                    String format = String.format("Could not shutdown broker at %s", address(kafkaServer));
                    log.error(format, th);
                    throw new RuntimeException(format, th);
                }
            }
            if (z) {
                for (KafkaServer kafkaServer2 : this.brokers) {
                    try {
                        log.info("Cleaning up kafka log dirs at {}", kafkaServer2.config().logDirs());
                        CoreUtils.delete(kafkaServer2.config().logDirs());
                    } catch (Throwable th2) {
                        String format2 = String.format("Could not clean up log dirs for broker at %s", address(kafkaServer2));
                        log.error(format2, th2);
                        throw new RuntimeException(format2, th2);
                    }
                }
            }
            if (z2) {
                try {
                    this.zookeeper.shutdown();
                } catch (Throwable th3) {
                    String format3 = String.format("Could not shutdown zookeeper at %s", zKConnectString());
                    log.error(format3, th3);
                    throw new RuntimeException(format3, th3);
                }
            }
        } catch (Exception e) {
            log.error("Could not shutdown producer ", e);
            throw new RuntimeException("Could not shutdown producer", e);
        }
    }

    private static void putIfAbsent(Properties properties, String str, Object obj) {
        if (properties.containsKey(str)) {
            return;
        }
        properties.put(str, obj);
    }

    private String createLogDir() {
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        try {
            temporaryFolder.create();
            return temporaryFolder.newFolder().getAbsolutePath();
        } catch (IOException e) {
            log.error("Unable to create temporary log directory", e);
            throw new ConnectException("Unable to create temporary log directory", e);
        }
    }

    public String bootstrapServers() {
        return (String) Arrays.stream(this.brokers).map(this::address).collect(Collectors.joining(","));
    }

    public String address(KafkaServer kafkaServer) {
        return kafkaServer.config().hostName() + ":" + kafkaServer.boundPort(this.listenerName);
    }

    public String zKConnectString() {
        return "127.0.0.1:" + this.zookeeper.port();
    }

    public void createTopic(String str) {
        createTopic(str, 1);
    }

    public void createTopic(String str, int i) {
        createTopic(str, i, 1, new HashMap());
    }

    public void createTopic(String str, int i, int i2, Map<String, String> map) {
        if (i2 > this.brokers.length) {
            throw new InvalidReplicationFactorException("Insufficient brokers (" + this.brokers.length + ") for desired replication (" + i2 + ")");
        }
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), map});
        NewTopic newTopic = new NewTopic(str, i, (short) i2);
        newTopic.configs(map);
        try {
            AdminClient createAdminClient = createAdminClient();
            Throwable th = null;
            try {
                try {
                    createAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                    if (createAdminClient != null) {
                        if (0 != 0) {
                            try {
                                createAdminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createAdminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void produce(String str, String str2) {
        produce(str, null, null, str2);
    }

    public void produce(String str, String str2, String str3) {
        produce(str, null, str2, str3);
    }

    public void produce(String str, Integer num, String str2, String str3) {
        ProducerRecord producerRecord = new ProducerRecord(str, num, str2 == null ? null : str2.getBytes(), str3 == null ? null : str3.getBytes());
        try {
            this.producer.send(producerRecord).get(DEFAULT_PRODUCE_SEND_DURATION_MS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new KafkaException("Could not produce message: " + producerRecord, e);
        }
    }

    public AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers());
        Object obj = this.brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp());
        if (obj != null && obj.toString().contains("SSL")) {
            properties.put("ssl.truststore.location", this.brokerConfig.get("ssl.truststore.location"));
            properties.put("ssl.truststore.password", ((Password) this.brokerConfig.get("ssl.truststore.password")).value());
            properties.put("security.protocol", "SSL");
        }
        return AdminClient.create(properties);
    }

    public ConsumerRecords<byte[], byte[]> consume(int i, long j, String... strArr) {
        HashMap hashMap = new HashMap();
        int i2 = 0;
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo = createConsumerAndSubscribeTo(Collections.emptyMap(), strArr);
        Throwable th = null;
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = j;
                while (j2 > 0) {
                    log.debug("Consuming from {} for {} millis.", Arrays.toString(strArr), Long.valueOf(j2));
                    ConsumerRecords poll = createConsumerAndSubscribeTo.poll(Duration.ofMillis(j2));
                    if (poll.isEmpty()) {
                        j2 = j - (System.currentTimeMillis() - currentTimeMillis);
                    } else {
                        for (TopicPartition topicPartition : poll.partitions()) {
                            List records = poll.records(topicPartition);
                            ((List) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                                return new ArrayList();
                            })).addAll(records);
                            i2 += records.size();
                        }
                        if (i2 >= i) {
                            ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(hashMap);
                            if (createConsumerAndSubscribeTo != null) {
                                if (0 != 0) {
                                    try {
                                        createConsumerAndSubscribeTo.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    createConsumerAndSubscribeTo.close();
                                }
                            }
                            return consumerRecords;
                        }
                        j2 = j - (System.currentTimeMillis() - currentTimeMillis);
                    }
                }
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                throw new RuntimeException("Could not find enough records. found " + i2 + ", expected " + i);
            } finally {
            }
        } catch (Throwable th4) {
            if (createConsumerAndSubscribeTo != null) {
                if (th != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th4;
        }
    }

    public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        putIfAbsent(hashMap, "group.id", UUID.randomUUID().toString());
        putIfAbsent(hashMap, "bootstrap.servers", bootstrapServers());
        putIfAbsent(hashMap, "enable.auto.commit", "false");
        putIfAbsent(hashMap, "auto.offset.reset", "earliest");
        putIfAbsent(hashMap, "key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        putIfAbsent(hashMap, "value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        try {
            return new KafkaConsumer<>(hashMap);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create consumer", th);
        }
    }

    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> map, String... strArr) {
        KafkaConsumer<byte[], byte[]> createConsumer = createConsumer(map);
        createConsumer.subscribe(Arrays.asList(strArr));
        return createConsumer;
    }

    private static void putIfAbsent(Map<String, Object> map, String str, Object obj) {
        if (map.containsKey(str)) {
            return;
        }
        map.put(str, obj);
    }
}
