package org.apache.pulsar.functions.metrics.sink;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.shade.com.google.common.cache.Cache;

/* loaded from: input_file:org/apache/pulsar/functions/metrics/sink/PrometheusSink.class */
public class PrometheusSink extends AbstractWebSink {
    private static final Logger LOG = Logger.getLogger(PrometheusSink.class.getName());
    private static final String PREFIX = "pulsar_function";
    private static final String DELIMITER = "\n";
    private Cache<String, Map<String, Double>> metricsCache;

    /* loaded from: input_file:org/apache/pulsar/functions/metrics/sink/PrometheusSink$Prometheus.class */
    static final class Prometheus {
        private static final Pattern METRIC_NAME_RE = Pattern.compile("[a-zA-Z_:][a-zA-Z0-9_:]*");
        private static final Pattern METRIC_LABEL_NAME_RE = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_]*");
        private static final Pattern RESERVED_METRIC_LABEL_NAME_RE = Pattern.compile("__.*");
        private static final Pattern SANITIZE_PREFIX_PATTERN = Pattern.compile("^[^a-zA-Z_]");
        private static final Pattern SANITIZE_BODY_PATTERN = Pattern.compile("[^a-zA-Z0-9_]");

        static void checkMetricName(String str) {
            if (!METRIC_NAME_RE.matcher(str).matches()) {
                throw new IllegalArgumentException("Invalid metric name: " + str);
            }
        }

        static String sanitizeMetricName(String str) {
            return SANITIZE_BODY_PATTERN.matcher(SANITIZE_PREFIX_PATTERN.matcher(str).replaceFirst(ClusterReplicationMetrics.SEPARATOR)).replaceAll(ClusterReplicationMetrics.SEPARATOR);
        }

        static void checkMetricLabelName(String str) {
            if (!METRIC_LABEL_NAME_RE.matcher(str).matches()) {
                throw new IllegalArgumentException("Invalid metric label name: " + str);
            }
            if (RESERVED_METRIC_LABEL_NAME_RE.matcher(str).matches()) {
                throw new IllegalArgumentException("Invalid metric label name, reserved for internal use: " + str);
            }
        }

        static String doubleToGoString(double d) {
            return d == Double.POSITIVE_INFINITY ? "+Inf" : d == Double.NEGATIVE_INFINITY ? "-Inf" : Double.isNaN(d) ? "NaN" : Double.toString(d);
        }

        private Prometheus() {
        }
    }

    @Override // org.apache.pulsar.functions.metrics.sink.AbstractWebSink
    void initialize(Map<String, String> map) {
        this.metricsCache = createCache();
    }

    @Override // org.apache.pulsar.functions.metrics.sink.AbstractWebSink
    byte[] generateResponse() throws IOException {
        this.metricsCache.cleanUp();
        ConcurrentMap<String, Map<String, Double>> asMap = this.metricsCache.asMap();
        StringBuilder sb = new StringBuilder();
        asMap.forEach((str, map) -> {
            String extractTenantFromFQN = FunctionDetailsUtils.extractTenantFromFQN(str);
            String extractNamespaceFromFQN = FunctionDetailsUtils.extractNamespaceFromFQN(str);
            String extractFunctionNameFromFQN = FunctionDetailsUtils.extractFunctionNameFromFQN(str);
            map.forEach((str, d) -> {
                sb.append(Prometheus.sanitizeMetricName(String.format("%s_%s", PREFIX, str))).append("{").append("tenant=\"").append(extractTenantFromFQN).append("\",").append("namespace=\"").append(extractNamespaceFromFQN).append("\",").append("functionname=\"").append(extractFunctionNameFromFQN).append("\"");
                sb.append("} ").append(Prometheus.doubleToGoString(d.doubleValue())).append(" ").append(currentTimeMillis()).append("\n");
            });
        });
        return sb.toString().getBytes();
    }

    @Override // org.apache.pulsar.functions.metrics.MetricsSink
    public void processRecord(InstanceCommunication.MetricsData metricsData, Function.FunctionDetails functionDetails) {
        String fullyQualifiedName = FunctionDetailsUtils.getFullyQualifiedName(functionDetails);
        Map<String, Double> ifPresent = this.metricsCache.getIfPresent(fullyQualifiedName);
        if (ifPresent == null) {
            ifPresent = createCache().asMap();
        }
        ifPresent.putAll(processMetrics(metricsData));
        this.metricsCache.put(fullyQualifiedName, ifPresent);
    }

    static Map<String, Double> processMetrics(InstanceCommunication.MetricsData metricsData) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> entry : metricsData.getMetricsMap().entrySet()) {
            hashMap.put(entry.getKey() + "_count", Double.valueOf(entry.getValue().getCount()));
            hashMap.put(entry.getKey() + "_sum", Double.valueOf(entry.getValue().getSum()));
            hashMap.put(entry.getKey() + "_max", Double.valueOf(entry.getValue().getMax()));
            hashMap.put(entry.getKey() + "_min", Double.valueOf(entry.getValue().getMin()));
        }
        return hashMap;
    }

    long currentTimeMillis() {
        return System.currentTimeMillis();
    }

    public Cache<String, Map<String, Double>> getMetricsCache() {
        return this.metricsCache;
    }

    @Override // org.apache.pulsar.functions.metrics.sink.AbstractWebSink, org.apache.pulsar.functions.metrics.MetricsSink, java.lang.AutoCloseable
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // org.apache.pulsar.functions.metrics.sink.AbstractWebSink, org.apache.pulsar.functions.metrics.MetricsSink
    public /* bridge */ /* synthetic */ void flush() {
        super.flush();
    }
}
