package ru.tinkoff.kora.micrometer.module.kafka.producer;

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.opentelemetry.semconv.SemanticAttributes;
import jakarta.annotation.Nullable;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import ru.tinkoff.kora.kafka.common.producer.telemetry.KafkaProducerMetrics;
import ru.tinkoff.kora.telemetry.common.TelemetryConfig;

/* loaded from: input_file:ru/tinkoff/kora/micrometer/module/kafka/producer/MicrometerKafkaProducerMetrics.class */
public class MicrometerKafkaProducerMetrics implements KafkaProducerMetrics, AutoCloseable {
    private final KafkaClientMetrics micrometerMetrics;
    private final Properties properties;
    private final TelemetryConfig.MetricsConfig config;
    private final MeterRegistry meterRegistry;
    private final ConcurrentHashMap<TopicPartition, DistributionSummary> metrics = new ConcurrentHashMap<>();

    public MicrometerKafkaProducerMetrics(MeterRegistry meterRegistry, TelemetryConfig.MetricsConfig metricsConfig, Producer<?, ?> producer, Properties properties) {
        this.micrometerMetrics = new KafkaClientMetrics(producer);
        this.micrometerMetrics.bindTo(meterRegistry);
        this.properties = properties;
        this.config = metricsConfig;
        this.meterRegistry = meterRegistry;
    }

    public KafkaProducerMetrics.KafkaProducerTxMetrics tx() {
        return new KafkaProducerMetrics.KafkaProducerTxMetrics() { // from class: ru.tinkoff.kora.micrometer.module.kafka.producer.MicrometerKafkaProducerMetrics.1
            public void commit() {
            }

            public void rollback(@Nullable Throwable th) {
            }
        };
    }

    public void sendEnd(ProducerRecord<?, ?> producerRecord, double d, Throwable th) {
        this.metrics.computeIfAbsent(new TopicPartition(producerRecord.topic(), ((Integer) Objects.requireNonNullElse(producerRecord.partition(), -1)).intValue()), this::metrics).record(d);
    }

    public void sendEnd(ProducerRecord<?, ?> producerRecord, double d, RecordMetadata recordMetadata) {
        this.metrics.computeIfAbsent(new TopicPartition(recordMetadata.topic(), recordMetadata.partition()), this::metrics).record(d);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.micrometerMetrics.close();
        Iterator<Map.Entry<TopicPartition, DistributionSummary>> it = this.metrics.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicPartition, DistributionSummary> next = it.next();
            it.remove();
            try {
                next.getValue().close();
            } catch (Throwable th) {
            }
        }
    }

    private DistributionSummary metrics(TopicPartition topicPartition) {
        DistributionSummary.Builder tag = DistributionSummary.builder("messaging.publish.duration").serviceLevelObjectives(this.config.slo()).baseUnit("milliseconds").tag(SemanticAttributes.MESSAGING_SYSTEM.getKey(), "kafka").tag(SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION.getKey(), Integer.toString(topicPartition.partition())).tag(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), topicPartition.topic());
        Object obj = this.properties.get("client.id");
        if (obj != null) {
            tag.tag(SemanticAttributes.MESSAGING_CLIENT_ID.getKey(), obj.toString());
        }
        return tag.register(this.meterRegistry);
    }
}
