package io.confluent.support.metrics.collectors;

import io.confluent.support.metrics.ClusterMetricsRecord;
import io.confluent.support.metrics.SupportKafkaMetricsEnhanced;
import io.confluent.support.metrics.common.Uuid;
import io.confluent.support.metrics.common.kafka.EmbeddedKafkaCluster;
import io.confluent.support.metrics.common.kafka.KafkaUtilities;
import io.confluent.support.metrics.common.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/support/metrics/collectors/FullCollectorMultiNodeClusterTest.class */
public class FullCollectorMultiNodeClusterTest {
    private static final long ONE_YEAR_RETENTION = 31536000000L;
    private static final byte[] ANY_BYTES = "any bytes".getBytes();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/support/metrics/collectors/FullCollectorMultiNodeClusterTest$TotalPartitionsAndTotalReplicaHistogram.class */
    public static class TotalPartitionsAndTotalReplicaHistogram {
        private final int totalPartitions;
        private final Integer[] totalReplicaHistogram;

        public TotalPartitionsAndTotalReplicaHistogram(int i, Integer[] numArr) {
            this.totalPartitions = i;
            this.totalReplicaHistogram = numArr;
        }

        public int getTotalPartitions() {
            return this.totalPartitions;
        }

        public Integer[] getTotalReplicaHistogram() {
            return this.totalReplicaHistogram;
        }
    }

    @Test
    public void metricsAreCollectedFromMultiNodeClusters() throws IOException, InterruptedException {
        Uuid uuid = new Uuid();
        TimeUtils timeUtils = new TimeUtils();
        Runtime runtime = Runtime.getRuntime();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startCluster(10);
        KafkaServer broker = embeddedKafkaCluster.getBroker(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new FullCollector(embeddedKafkaCluster.getBroker(i), brokerConfigurationFrom(embeddedKafkaCluster.getBroker(i), embeddedKafkaCluster.zookeeperConnectString()), runtime, timeUtils, uuid));
        }
        TotalPartitionsAndTotalReplicaHistogram createTopicsWithMessages = createTopicsWithMessages(20, 10, embeddedKafkaCluster, broker.zkClient());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            SupportKafkaMetricsEnhanced collectMetrics = ((FullCollector) it.next()).collectMetrics();
            Assertions.assertThat(collectMetrics.getClusterId()).isEqualTo(broker.clusterId());
            ClusterMetricsRecord clusterMetrics = collectMetrics.getClusterMetrics();
            Assertions.assertThat(clusterMetrics.getNumberTopicsZk()).isEqualTo(20);
            Assertions.assertThat(clusterMetrics.getNumberTopics()).isLessThanOrEqualTo(20);
            Assertions.assertThat(clusterMetrics.getNumberPartitions()).isLessThanOrEqualTo(createTopicsWithMessages.getTotalPartitions());
            List<Integer> replicationHistogram = clusterMetrics.getReplicationHistogram();
            Integer[] totalReplicaHistogram = createTopicsWithMessages.getTotalReplicaHistogram();
            for (int i2 = 0; i2 < 6; i2++) {
                Assertions.assertThat(replicationHistogram.get(i2)).isLessThanOrEqualTo(totalReplicaHistogram[i2]);
            }
        }
        embeddedKafkaCluster.stopCluster();
    }

    private Properties brokerConfigurationFrom(KafkaServer kafkaServer, String str) throws IOException {
        Properties properties = new Properties();
        properties.load(FullCollectorMultiNodeClusterTest.class.getResourceAsStream("/default-server.properties"));
        properties.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(kafkaServer.config().brokerId()));
        properties.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), str);
        return properties;
    }

    private TotalPartitionsAndTotalReplicaHistogram createTopicsWithMessages(int i, int i2, EmbeddedKafkaCluster embeddedKafkaCluster, KafkaZkClient kafkaZkClient) throws InterruptedException {
        KafkaUtilities kafkaUtilities = new KafkaUtilities();
        int i3 = 0;
        Integer[] numArr = new Integer[6];
        for (int i4 = 0; i4 < numArr.length; i4++) {
            numArr[i4] = 0;
        }
        Random random = new Random();
        for (int i5 = 0; i5 < i; i5++) {
            String str = "__exampleTopic" + i5;
            int nextInt = random.nextInt(10) + 2;
            int nextInt2 = random.nextInt(i2) + 1;
            String str2 = "Topic='" + str + "', partitions=" + nextInt + ", replication=" + nextInt2;
            if (!kafkaUtilities.createAndVerifyTopic(kafkaZkClient, str, nextInt, nextInt2, ONE_YEAR_RETENTION)) {
                Assertions.fail("Failed to create and/or verify topic " + str2);
            }
            waitUntilMetadataIsPropagated(str, nextInt, embeddedKafkaCluster, i2);
            Producer<byte[], byte[]> createProducer = createProducer(kafkaUtilities.getBootstrapServers(kafkaZkClient, i2));
            Future send = createProducer.send(new ProducerRecord(str, ANY_BYTES));
            createProducer.flush();
            try {
                send.get();
            } catch (InterruptedException e) {
                Assertions.fail("Failed to submit metrics to Kafka topic (canceled request). " + str2);
            } catch (ExecutionException e2) {
                Assertions.fail("Failed to submit metrics to Kafka topic (due to exception=" + e2.getMessage() + "). " + str2);
            }
            i3 += nextInt;
            int i6 = 0;
            if (nextInt2 >= 0) {
                i6 = nextInt2 >= numArr.length ? numArr.length - 1 : nextInt2;
            }
            int i7 = i6;
            numArr[i7] = Integer.valueOf(numArr[i7].intValue() + nextInt);
        }
        return new TotalPartitionsAndTotalReplicaHistogram(i3, numArr);
    }

    private Producer<byte[], byte[]> createProducer(List<String> list) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", Utils.join(list, ","));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer(properties);
    }

    private void waitUntilMetadataIsPropagated(String str, int i, EmbeddedKafkaCluster embeddedKafkaCluster, int i2) {
        HashSet hashSet = new HashSet();
        for (int i3 = 0; i3 < i2; i3++) {
            hashSet.add(embeddedKafkaCluster.getBroker(i3));
        }
        for (int i4 = 0; i4 < i; i4++) {
            if (TestUtils.waitUntilMetadataIsPropagated(JavaConversions.asScalaSet(hashSet).toSeq(), str, i4, 30000L) < 0) {
                Assertions.fail("Topic=" + str + " partition=" + i4 + " metadata was not propagated.");
            }
        }
    }
}
