package org.apache.pulsar.broker.stats.prometheus;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.utils.SimpleTextOutputStream;

/* loaded from: input_file:org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.class */
public class NamespaceStatsAggregator {
    private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() { // from class: org.apache.pulsar.broker.stats.prometheus.NamespaceStatsAggregator.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public AggregatedNamespaceStats initialValue() throws Exception {
            return new AggregatedNamespaceStats();
        }
    };

    public static void generate(PulsarService pulsarService, SimpleTextOutputStream simpleTextOutputStream) {
        String clusterName = pulsarService.getConfiguration().getClusterName();
        AggregatedNamespaceStats aggregatedNamespaceStats = localNamespaceStats.get();
        pulsarService.getBrokerService().getMultiLayerTopicMap().forEach((str, concurrentOpenHashMap) -> {
            aggregatedNamespaceStats.reset();
            concurrentOpenHashMap.forEach((str, concurrentOpenHashMap) -> {
                concurrentOpenHashMap.forEach((str, topic) -> {
                    updateNamespaceStats(aggregatedNamespaceStats, topic);
                });
            });
            printNamespaceStats(simpleTextOutputStream, clusterName, str, aggregatedNamespaceStats);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void updateNamespaceStats(AggregatedNamespaceStats aggregatedNamespaceStats, Topic topic) {
        if (topic instanceof PersistentTopic) {
            ManagedLedgerMBeanImpl managedLedgerMBeanImpl = (ManagedLedgerMBeanImpl) ((PersistentTopic) topic).getManagedLedger().getStats();
            aggregatedNamespaceStats.storageSize += managedLedgerMBeanImpl.getStoredMessagesSize();
            aggregatedNamespaceStats.storageWriteLatencyBuckets.addAll(managedLedgerMBeanImpl.getInternalAddEntryLatencyBuckets());
            aggregatedNamespaceStats.entrySizeBuckets.addAll(managedLedgerMBeanImpl.getInternalEntrySizeBuckets());
            aggregatedNamespaceStats.storageWriteRate = managedLedgerMBeanImpl.getAddEntryMessagesRate();
            aggregatedNamespaceStats.storageReadRate = managedLedgerMBeanImpl.getReadEntriesRate();
        }
        aggregatedNamespaceStats.topicsCount++;
        topic.getProducers().forEach(producer -> {
            if (producer.isRemote()) {
                AggregatedReplicationStats computeIfAbsent = aggregatedNamespaceStats.replicationStats.computeIfAbsent(producer.getRemoteCluster(), str -> {
                    return new AggregatedReplicationStats();
                });
                computeIfAbsent.msgRateIn += producer.getStats().msgRateIn;
                computeIfAbsent.msgThroughputIn += producer.getStats().msgThroughputIn;
            } else {
                aggregatedNamespaceStats.producersCount++;
                aggregatedNamespaceStats.rateIn += producer.getStats().msgRateIn;
                aggregatedNamespaceStats.throughputIn += producer.getStats().msgThroughputIn;
            }
        });
        topic.getSubscriptions().forEach((str, subscription) -> {
            aggregatedNamespaceStats.subscriptionsCount++;
            aggregatedNamespaceStats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
            subscription.getConsumers().forEach(consumer -> {
                aggregatedNamespaceStats.consumersCount++;
                aggregatedNamespaceStats.rateOut += consumer.getStats().msgRateOut;
                aggregatedNamespaceStats.throughputOut += consumer.getStats().msgThroughputOut;
            });
        });
        topic.getReplicators().forEach((str2, replicator) -> {
            AggregatedReplicationStats computeIfAbsent = aggregatedNamespaceStats.replicationStats.computeIfAbsent(str2, str2 -> {
                return new AggregatedReplicationStats();
            });
            ReplicatorStats stats = replicator.getStats();
            computeIfAbsent.msgRateOut += stats.msgRateOut;
            computeIfAbsent.msgThroughputOut += stats.msgThroughputOut;
            computeIfAbsent.replicationBacklog += stats.replicationBacklog;
        });
    }

    private static void printNamespaceStats(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, AggregatedNamespaceStats aggregatedNamespaceStats) {
        metric(simpleTextOutputStream, str, str2, "pulsar_topics_count", aggregatedNamespaceStats.topicsCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_subscriptions_count", aggregatedNamespaceStats.subscriptionsCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_producers_count", aggregatedNamespaceStats.producersCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_consumers_count", aggregatedNamespaceStats.consumersCount);
        metric(simpleTextOutputStream, str, str2, "pulsar_rate_in", aggregatedNamespaceStats.rateIn);
        metric(simpleTextOutputStream, str, str2, "pulsar_rate_out", aggregatedNamespaceStats.rateOut);
        metric(simpleTextOutputStream, str, str2, "pulsar_throughput_in", aggregatedNamespaceStats.throughputIn);
        metric(simpleTextOutputStream, str, str2, "pulsar_throughput_out", aggregatedNamespaceStats.throughputOut);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_size", aggregatedNamespaceStats.storageSize);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_rate", aggregatedNamespaceStats.storageWriteRate);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_read_rate", aggregatedNamespaceStats.storageReadRate);
        metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_msg_backlog", "local", aggregatedNamespaceStats.msgBacklog);
        aggregatedNamespaceStats.storageWriteLatencyBuckets.refresh();
        long[] buckets = aggregatedNamespaceStats.storageWriteLatencyBuckets.getBuckets();
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_0_5", buckets[0]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_1", buckets[1]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_5", buckets[2]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_10", buckets[3]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_20", buckets[4]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_50", buckets[5]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_100", buckets[6]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_200", buckets[7]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_le_1000", buckets[8]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_overflow", buckets[9]);
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_count", aggregatedNamespaceStats.storageWriteLatencyBuckets.getCount());
        metric(simpleTextOutputStream, str, str2, "pulsar_storage_write_latency_sum", aggregatedNamespaceStats.storageWriteLatencyBuckets.getSum());
        aggregatedNamespaceStats.entrySizeBuckets.refresh();
        long[] buckets2 = aggregatedNamespaceStats.entrySizeBuckets.getBuckets();
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_128", buckets2[0]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_512", buckets2[1]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_1_kb", buckets2[2]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_2_kb", buckets2[3]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_4_kb", buckets2[4]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_16_kb", buckets2[5]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_100_kb", buckets2[6]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_1_mb", buckets2[7]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_le_overflow", buckets2[8]);
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_count", aggregatedNamespaceStats.entrySizeBuckets.getCount());
        metric(simpleTextOutputStream, str, str2, "pulsar_entry_size_sum", aggregatedNamespaceStats.entrySizeBuckets.getSum());
        if (aggregatedNamespaceStats.replicationStats.isEmpty()) {
            return;
        }
        aggregatedNamespaceStats.replicationStats.forEach((str3, aggregatedReplicationStats) -> {
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_rate_in", str3, aggregatedReplicationStats.msgRateIn);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_rate_out", str3, aggregatedReplicationStats.msgRateOut);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_throughput_in", str3, aggregatedReplicationStats.msgThroughputIn);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_throughput_out", str3, aggregatedReplicationStats.msgThroughputOut);
            metricWithRemoteCluster(simpleTextOutputStream, str, str2, "pulsar_replication_backlog", str3, aggregatedReplicationStats.replicationBacklog);
        });
    }

    private static void metric(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, long j) {
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\", namespace=\"").write(str2).write("\"} ");
        simpleTextOutputStream.write(Long.valueOf(j)).write(' ').write(Long.valueOf(System.currentTimeMillis())).write('\n');
    }

    private static void metric(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, double d) {
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\", namespace=\"").write(str2).write("\"} ");
        simpleTextOutputStream.write(Double.valueOf(d)).write(' ').write(Long.valueOf(System.currentTimeMillis())).write('\n');
    }

    private static void metricWithRemoteCluster(SimpleTextOutputStream simpleTextOutputStream, String str, String str2, String str3, String str4, double d) {
        simpleTextOutputStream.write(str3).write("{cluster=\"").write(str).write("\", namespace=\"").write(str2);
        simpleTextOutputStream.write("\", remote_cluster=\"").write(str4).write("\"} ");
        simpleTextOutputStream.write(Double.valueOf(d)).write(' ').write(Long.valueOf(System.currentTimeMillis())).write('\n');
    }
}
