package org.apache.storm.kafka.spout.metrics;

import com.google.common.base.Supplier;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.class */
public class KafkaOffsetMetric<K, V> implements IMetric {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
    private final Supplier<KafkaConsumer<K, V>> consumerSupplier;

    /* loaded from: input_file:org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric$TopicMetrics.class */
    private class TopicMetrics {
        long totalSpoutLag;
        long totalEarliestTimeOffset;
        long totalLatestTimeOffset;
        long totalLatestEmittedOffset;
        long totalLatestCompletedOffset;
        long totalRecordsInPartitions;

        private TopicMetrics() {
            this.totalSpoutLag = 0L;
            this.totalEarliestTimeOffset = 0L;
            this.totalLatestTimeOffset = 0L;
            this.totalLatestEmittedOffset = 0L;
            this.totalLatestCompletedOffset = 0L;
            this.totalRecordsInPartitions = 0L;
        }
    }

    public KafkaOffsetMetric(Supplier supplier, Supplier supplier2) {
        this.offsetManagerSupplier = supplier;
        this.consumerSupplier = supplier2;
    }

    public Object getValueAndReset() {
        Map map = (Map) this.offsetManagerSupplier.get();
        KafkaConsumer kafkaConsumer = (KafkaConsumer) this.consumerSupplier.get();
        if (map == null || map.isEmpty() || kafkaConsumer == null) {
            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
            return null;
        }
        HashMap hashMap = new HashMap();
        Set<K> keySet = map.keySet();
        Map beginningOffsets = kafkaConsumer.beginningOffsets(keySet);
        Map endOffsets = kafkaConsumer.endOffsets(keySet);
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<K, V> entry : map.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            OffsetManager offsetManager = (OffsetManager) entry.getValue();
            long longValue = ((Long) endOffsets.get(topicPartition)).longValue();
            long longValue2 = ((Long) beginningOffsets.get(topicPartition)).longValue();
            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
            long committedOffset = offsetManager.getCommittedOffset();
            long j = longValue - committedOffset;
            long j2 = longValue - longValue2;
            String str = topicPartition.topic() + "/partition_" + topicPartition.partition();
            hashMap2.put(str + "/spoutLag", Long.valueOf(j));
            hashMap2.put(str + "/earliestTimeOffset", Long.valueOf(longValue2));
            hashMap2.put(str + "/latestTimeOffset", Long.valueOf(longValue));
            hashMap2.put(str + "/latestEmittedOffset", Long.valueOf(latestEmittedOffset));
            hashMap2.put(str + "/latestCompletedOffset", Long.valueOf(committedOffset));
            hashMap2.put(str + "/recordsInPartition", Long.valueOf(j2));
            TopicMetrics topicMetrics = (TopicMetrics) hashMap.get(topicPartition.topic());
            if (topicMetrics == null) {
                topicMetrics = new TopicMetrics();
                hashMap.put(topicPartition.topic(), topicMetrics);
            }
            topicMetrics.totalSpoutLag += j;
            topicMetrics.totalEarliestTimeOffset += longValue2;
            topicMetrics.totalLatestTimeOffset += longValue;
            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
            topicMetrics.totalLatestCompletedOffset += committedOffset;
            topicMetrics.totalRecordsInPartitions += j2;
        }
        for (Map.Entry<K, V> entry2 : hashMap.entrySet()) {
            String str2 = (String) entry2.getKey();
            TopicMetrics topicMetrics2 = (TopicMetrics) entry2.getValue();
            hashMap2.put(str2 + "/totalSpoutLag", Long.valueOf(topicMetrics2.totalSpoutLag));
            hashMap2.put(str2 + "/totalEarliestTimeOffset", Long.valueOf(topicMetrics2.totalEarliestTimeOffset));
            hashMap2.put(str2 + "/totalLatestTimeOffset", Long.valueOf(topicMetrics2.totalLatestTimeOffset));
            hashMap2.put(str2 + "/totalLatestEmittedOffset", Long.valueOf(topicMetrics2.totalLatestEmittedOffset));
            hashMap2.put(str2 + "/totalLatestCompletedOffset", Long.valueOf(topicMetrics2.totalLatestCompletedOffset));
            hashMap2.put(str2 + "/totalRecordsInPartitions", Long.valueOf(topicMetrics2.totalRecordsInPartitions));
        }
        LOG.debug("Metrics Tick: value : {}", hashMap2);
        return hashMap2;
    }
}
