package io.confluent.mqtt.integration;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.mqtt.KafkaMqttMain;
import io.confluent.mqtt.MqttConfig;
import io.confluent.shaded.io.opencensus.proto.metrics.v1.Metric;
import io.confluent.shaded.io.opencensus.proto.resource.v1.Resource;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import io.confluent.telemetry.exporter.ExporterConfig;
import io.confluent.telemetry.serde.OpencensusMetricsProto;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/mqtt/integration/TelemetryReporterIntegrationTest.class */
public class TelemetryReporterIntegrationTest {
    private static final String PRODUCER_PREFIX = "producer.";
    private static final String TELEMETRY_TOPIC_NAME = "mqtt-telemetry-it";
    EmbeddedKafkaCluster kafka;
    private Serde<Metric> serde = new OpencensusMetricsProto();
    private static final Logger log = LoggerFactory.getLogger(TelemetryReporterIntegrationTest.class);
    private static final Integer COLLECT_INTERVAL_CONFIG_MS = 2000;

    @Before
    public void setUp() {
        this.kafka = new EmbeddedKafkaCluster();
        this.kafka.startZooKeeper();
        Properties properties = new Properties();
        properties.setProperty("confluent.balancer.topic.replication.factor", "1");
        this.kafka.startBroker(1, properties);
        Time time = Time.SYSTEM;
        long hiResClockMs = time.hiResClockMs();
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.MetricReporterClassesProp(), "io.confluent.telemetry.reporter.TelemetryReporter");
        hashMap.put("confluent.telemetry.metrics.collector.interval.ms", COLLECT_INTERVAL_CONFIG_MS.toString());
        hashMap.put("confluent.telemetry.metrics.collector.include", "");
        hashMap.put(ConfluentTelemetryConfig.exporterPrefixForName("test") + "type", ExporterConfig.ExporterType.kafka.name());
        hashMap.put(ConfluentTelemetryConfig.exporterPrefixForName("test") + "enabled", "true");
        hashMap.put(ConfluentTelemetryConfig.exporterPrefixForName("test") + "producer.bootstrap.servers", this.kafka.bootstrapServers());
        hashMap.put(ConfluentTelemetryConfig.exporterPrefixForName("test") + "topic.name", TELEMETRY_TOPIC_NAME);
        hashMap.put(ConfluentTelemetryConfig.exporterPrefixForName("test") + "topic.replicas", "1");
        hashMap.put("topic.regex.list", "temperature:.*temperature");
        hashMap.put("listeners", "0.0.0.0:1883");
        hashMap.put("bootstrap.servers", this.kafka.bootstrapServers());
        hashMap.put("confluent.topic.replication.factor", "1");
        KafkaMqttMain.start(new MqttConfig(hashMap), hiResClockMs, time, false);
        log.info("Started MQTT Proxy server");
    }

    @After
    public void tearDown() {
        log.info("Shutting down Kafka");
        this.kafka.shutdown();
    }

    @Test
    public void testMetricsReporter() throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafka.bootstrapServers());
        AdminClient create = AdminClient.create(hashMap);
        String str = (String) create.describeCluster().clusterId().get();
        create.close();
        publishMessages(100);
        ConsumerRecords<byte[], byte[]> consume = consume(Duration.ofMillis(COLLECT_INTERVAL_CONFIG_MS.intValue() * 30), TELEMETRY_TOPIC_NAME);
        log.info("Consumed {} messages from the telemetry topic", Integer.valueOf(consume.count()));
        boolean z = false;
        HashSet hashSet = new HashSet();
        Iterator it = consume.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Metric metric = null;
            try {
                metric = (Metric) this.serde.deserializer().deserialize(consumerRecord.topic(), consumerRecord.headers(), (byte[]) consumerRecord.value());
            } catch (SerializationException e) {
                Assert.fail("failed to deserialize message " + e.getMessage());
            }
            Resource resource = metric.getResource();
            Map labelsMap = resource.getLabelsMap();
            Assert.assertEquals(AppInfoParser.getVersion(), labelsMap.get("mqtt_proxy.version"));
            Assert.assertEquals(str, labelsMap.get("mqtt_proxy.id"));
            Assert.assertEquals(str, labelsMap.get("mqtt_proxy.cluster.id"));
            Assert.assertEquals(AppInfoParser.getCommitId(), labelsMap.get("mqtt_proxy.commit.id"));
            MatcherAssert.assertThat(labelsMap.get("mqtt_proxy.client.id"), CoreMatchers.startsWith("producer-"));
            Assert.assertEquals("mqtt_proxy", resource.getType());
            if (metric.getMetricDescriptor().getName().startsWith("io.confluent.kafka.client")) {
                z = true;
            }
            hashSet.add(metric.getMetricDescriptor().getName());
        }
        Assert.assertTrue(z);
        log.info("Collected metrics: {}", hashSet);
    }

    private void publishMessages(int i) {
        try {
            MqttClient mqttClient = new MqttClient("tcp://0.0.0.0:1883", "integration-test-mqtt-client");
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setAutomaticReconnect(true);
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(60);
            int i2 = 0;
            while (true) {
                try {
                    mqttClient.connect(mqttConnectOptions);
                    for (int i3 = 0; i3 < i; i3++) {
                        MqttMessage mqttMessage = new MqttMessage(String.format("%dF", Integer.valueOf(60 + i3)).getBytes());
                        mqttMessage.setQos(0);
                        mqttClient.publish("temperature", mqttMessage);
                        log.info("Published MQTT message # {}", Integer.valueOf(i3));
                    }
                    return;
                } catch (MqttException e) {
                    log.warn("Failed to connect to MQTT Proxy server", e);
                    i2++;
                    if (i2 == 60) {
                        throw e;
                    }
                    log.info("Retrying to connect to MQTT Proxy server");
                }
            }
        } catch (MqttException e2) {
            log.error("Failed to publish mqtt message", e2);
        }
    }

    public ConsumerRecords<byte[], byte[]> consume(Duration duration, String... strArr) {
        HashMap hashMap = new HashMap();
        int i = 0;
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo = createConsumerAndSubscribeTo(Collections.emptyMap(), strArr);
        Throwable th = null;
        try {
            try {
                Instant now = Instant.now();
                Duration duration2 = duration;
                while (!duration2.isNegative()) {
                    log.debug("Consuming from {} for {} millis, consumed {} messages already.", new Object[]{Arrays.toString(strArr), Long.valueOf(duration2.toMillis()), Integer.valueOf(i)});
                    ConsumerRecords poll = createConsumerAndSubscribeTo.poll(duration2);
                    if (poll.isEmpty()) {
                        duration2 = duration.minus(Duration.between(now, Instant.now()));
                    } else {
                        for (TopicPartition topicPartition : poll.partitions()) {
                            List records = poll.records(topicPartition);
                            ((List) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                                return new ArrayList();
                            })).addAll(records);
                            i += records.size();
                        }
                        duration2 = duration.minus(Duration.between(now, Instant.now()));
                    }
                }
                if (createConsumerAndSubscribeTo != null) {
                    if (0 != 0) {
                        try {
                            createConsumerAndSubscribeTo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConsumerAndSubscribeTo.close();
                    }
                }
                return new ConsumerRecords<>(hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (createConsumerAndSubscribeTo != null) {
                if (th != null) {
                    try {
                        createConsumerAndSubscribeTo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConsumerAndSubscribeTo.close();
                }
            }
            throw th3;
        }
    }

    public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        putIfAbsent(hashMap, "group.id", UUID.randomUUID().toString());
        putIfAbsent(hashMap, "bootstrap.servers", this.kafka.bootstrapServers());
        putIfAbsent(hashMap, "enable.auto.commit", "false");
        putIfAbsent(hashMap, "auto.offset.reset", "earliest");
        putIfAbsent(hashMap, "key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        putIfAbsent(hashMap, "value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        try {
            return new KafkaConsumer<>(hashMap);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create consumer", th);
        }
    }

    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> map, String... strArr) {
        KafkaConsumer<byte[], byte[]> createConsumer = createConsumer(map);
        createConsumer.subscribe(Arrays.asList(strArr));
        return createConsumer;
    }

    private static void putIfAbsent(Map<String, Object> map, String str, Object obj) {
        if (map.containsKey(str)) {
            return;
        }
        map.put(str, obj);
    }
}
