package ru.tinkoff.kora.micrometer.module.kafka.consumer;

import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.kafka.common.consumer.telemetry.KafkaConsumerMetrics;
import ru.tinkoff.kora.micrometer.module.MetricsConfig;

/* loaded from: input_file:ru/tinkoff/kora/micrometer/module/kafka/consumer/MicrometerKafkaConsumerMetrics.class */
public class MicrometerKafkaConsumerMetrics implements KafkaConsumerMetrics, Lifecycle {
    private final MeterRegistry meterRegistry;
    private final ConcurrentHashMap<TopicPartition, DistributionSummary> metrics = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<TopicPartition, LagGauge> lagMetrics = new ConcurrentHashMap<>();
    private final MetricsConfig.KafkaConsumerMetricsConfig config;

    /* loaded from: input_file:ru/tinkoff/kora/micrometer/module/kafka/consumer/MicrometerKafkaConsumerMetrics$LagGauge.class */
    private static class LagGauge {
        private final Gauge gauge;
        private volatile long offsetLag;

        private LagGauge(TopicPartition topicPartition, MeterRegistry meterRegistry) {
            this.gauge = Gauge.builder("messaging.kafka.consumer.lag", () -> {
                return Long.valueOf(this.offsetLag);
            }).tag("messaging.system", "kafka").tag("messaging.destination", topicPartition.topic()).tag("messaging.destination_kind", "topic").register(meterRegistry);
        }
    }

    public MicrometerKafkaConsumerMetrics(MeterRegistry meterRegistry, MetricsConfig.KafkaConsumerMetricsConfig kafkaConsumerMetricsConfig) {
        this.meterRegistry = meterRegistry;
        this.config = kafkaConsumerMetricsConfig;
    }

    public void onRecordsReceived(ConsumerRecords<?, ?> consumerRecords) {
        Iterator it = consumerRecords.partitions().iterator();
        while (it.hasNext()) {
            this.metrics.computeIfAbsent((TopicPartition) it.next(), this::metrics);
        }
    }

    private DistributionSummary metrics(TopicPartition topicPartition) {
        return DistributionSummary.builder("messaging.consumer.duration").serviceLevelObjectives(this.config.slo()).baseUnit("milliseconds").tag("messaging.system", "kafka").tag("messaging.destination", topicPartition.topic()).tag("messaging.destination_kind", "topic").register(this.meterRegistry);
    }

    public void onRecordProcessed(ConsumerRecord<?, ?> consumerRecord, long j, Throwable th) {
        this.metrics.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())).record(j / 1000000.0d);
    }

    public void reportLag(TopicPartition topicPartition, long j) {
        this.lagMetrics.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new LagGauge(topicPartition2, this.meterRegistry);
        }).offsetLag = j;
    }

    public void onRecordsProcessed(ConsumerRecords<?, ?> consumerRecords, long j, Throwable th) {
    }

    public Mono<?> init() {
        return Mono.empty();
    }

    public Mono<?> release() {
        return Mono.fromRunnable(() -> {
            ArrayList arrayList = new ArrayList(this.metrics.values());
            this.metrics.clear();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((DistributionSummary) it.next()).close();
            }
            ArrayList arrayList2 = new ArrayList(this.lagMetrics.values());
            this.lagMetrics.clear();
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                ((LagGauge) it2.next()).gauge.close();
            }
        });
    }
}
