package dev.responsive.kafka.clients;

import dev.responsive.kafka.clients.OffsetRecorder;
import dev.responsive.kafka.clients.ResponsiveConsumer;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/clients/MetricPublishingCommitListener.class */
public class MetricPublishingCommitListener implements ResponsiveConsumer.Listener, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MetricPublishingCommitListener.class);
    private final Metrics metrics;
    private final String threadId;
    private final Map<TopicPartition, Optional<CommittedOffset>> offsets = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/responsive/kafka/clients/MetricPublishingCommitListener$CommittedOffset.class */
    public static class CommittedOffset {
        private final long offset;
        private final String consumerGroup;

        private CommittedOffset(long j, String str) {
            this.offset = j;
            this.consumerGroup = str;
        }

        public long getOffset() {
            return this.offset;
        }

        public String getConsumerGroup() {
            return this.consumerGroup;
        }
    }

    public MetricPublishingCommitListener(Metrics metrics, String str, OffsetRecorder offsetRecorder) {
        this.metrics = (Metrics) Objects.requireNonNull(metrics);
        this.threadId = (String) Objects.requireNonNull(str);
        offsetRecorder.addCommitCallback(this::commitCallback);
    }

    private MetricName metricName(OffsetRecorder.RecordingKey recordingKey) {
        return metricName(recordingKey.getPartition(), recordingKey.getConsumerGroup());
    }

    private MetricName metricName(TopicPartition topicPartition, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("consumerGroup", str);
        hashMap.put("thread", this.threadId);
        hashMap.put("topic", topicPartition.topic());
        hashMap.put("partition", Integer.toString(topicPartition.partition()));
        return new MetricName("committed-offset", "responsive.streams", "", hashMap);
    }

    private void commitCallback(Map<OffsetRecorder.RecordingKey, Long> map, Map<TopicPartition, Long> map2) {
        for (Map.Entry<OffsetRecorder.RecordingKey, Long> entry : map.entrySet()) {
            this.offsets.computeIfPresent(entry.getKey().getPartition(), (topicPartition, optional) -> {
                if (optional.isEmpty()) {
                    LOG.debug("add committed offset metric for {} {}", this.threadId, topicPartition);
                    this.metrics.addMetric(metricName((OffsetRecorder.RecordingKey) entry.getKey()), (metricConfig, j) -> {
                        return (Long) this.offsets.getOrDefault(topicPartition, Optional.empty()).map((v0) -> {
                            return v0.getOffset();
                        }).orElse(-1L);
                    });
                }
                LOG.debug("record committed offset {} {}: {}", new Object[]{this.threadId, topicPartition, entry.getValue()});
                return Optional.of(new CommittedOffset(((Long) entry.getValue()).longValue(), ((OffsetRecorder.RecordingKey) entry.getKey()).getConsumerGroup()));
            });
        }
    }

    @Override // dev.responsive.kafka.clients.ResponsiveConsumer.Listener
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        Logger logger = LOG;
        Stream<TopicPartition> stream = collection.stream();
        Map<TopicPartition, Optional<CommittedOffset>> map = this.offsets;
        Objects.requireNonNull(map);
        logger.info("Remove committed offset metrics entry for {}", stream.filter((v1) -> {
            return r3.containsKey(v1);
        }).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(",")));
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            this.offsets.computeIfPresent(it.next(), (topicPartition, optional) -> {
                optional.ifPresent(committedOffset -> {
                    this.metrics.removeMetric(metricName(topicPartition, committedOffset.getConsumerGroup()));
                });
                return null;
            });
        }
    }

    @Override // dev.responsive.kafka.clients.ResponsiveConsumer.Listener
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOG.info("Add committed offsets metrics entry for {}", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(",")));
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            this.offsets.putIfAbsent(it.next(), Optional.empty());
        }
    }

    @Override // dev.responsive.kafka.clients.ResponsiveConsumer.Listener
    public void onPartitionsLost(Collection<TopicPartition> collection) {
        onPartitionsRevoked(collection);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (TopicPartition topicPartition : this.offsets.keySet()) {
            if (this.offsets.get(topicPartition).isPresent()) {
                LOG.info("Clean up committed offset metric {} {}", this.threadId, topicPartition);
                this.metrics.removeMetric(metricName(topicPartition, this.offsets.get(topicPartition).get().getConsumerGroup()));
            }
        }
        this.offsets.clear();
    }
}
