package com.pinterest.doctorkafka.replicastats;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.KafkaCluster;
import com.pinterest.doctorkafka.ReplicaStat;
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/replicastats/ReplicaStatsManager.class */
public class ReplicaStatsManager {
    private static final Logger LOG;
    private static final int SLIDING_WINDOW_SIZE = 5760;
    private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800000;
    public static ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> bytesInStats;
    public static ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> bytesOutStats;
    public static ConcurrentMap<String, KafkaCluster> clusters;
    public static DoctorKafkaConfig config;
    public static ConcurrentHashMap<String, ConcurrentHashMap<TopicPartition, Long>> replicaReassignmentTimestamps;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static void initialize(DoctorKafkaConfig doctorKafkaConfig) {
        config = doctorKafkaConfig;
    }

    public static void updateReplicaReassignmentTimestamp(String str, ReplicaStat replicaStat) {
        if (!replicaReassignmentTimestamps.containsKey(str)) {
            replicaReassignmentTimestamps.put(str, new ConcurrentHashMap<>());
        }
        ConcurrentHashMap<TopicPartition, Long> concurrentHashMap = replicaReassignmentTimestamps.get(str);
        TopicPartition topicPartition = new TopicPartition(replicaStat.getTopic(), replicaStat.getPartition().intValue());
        if (!concurrentHashMap.containsKey(topicPartition) || concurrentHashMap.get(topicPartition).longValue() < replicaStat.getTimestamp().longValue()) {
            concurrentHashMap.put(topicPartition, replicaStat.getTimestamp());
        }
    }

    private static long getLastReplicaReassignmentTimestamp(String str, TopicPartition topicPartition) {
        long j = 0;
        if (replicaReassignmentTimestamps.containsKey(str)) {
            ConcurrentHashMap<TopicPartition, Long> concurrentHashMap = replicaReassignmentTimestamps.get(str);
            if (concurrentHashMap.containsKey(topicPartition)) {
                j = concurrentHashMap.get(topicPartition).longValue();
            }
        }
        return j;
    }

    public static void update(BrokerStats brokerStats) {
        KafkaCluster kafkaCluster;
        String zkUrl = brokerStats.getZkUrl();
        if (config.getClusterZkUrls().contains(zkUrl)) {
            synchronized (clusters) {
                if (zkUrl != null) {
                    if (!clusters.containsKey(zkUrl)) {
                        DoctorKafkaClusterConfig clusterConfigByZkUrl = config.getClusterConfigByZkUrl(zkUrl);
                        if (!$assertionsDisabled && clusterConfigByZkUrl == null) {
                            throw new AssertionError();
                        }
                        clusters.put(zkUrl, new KafkaCluster(zkUrl, clusterConfigByZkUrl));
                    }
                }
                kafkaCluster = clusters.get(zkUrl);
            }
            kafkaCluster.recordBrokerStats(brokerStats);
            if (!bytesInStats.containsKey(zkUrl)) {
                bytesInStats.put(zkUrl, new ConcurrentHashMap());
            }
            if (!bytesOutStats.containsKey(zkUrl)) {
                bytesOutStats.put(zkUrl, new ConcurrentHashMap());
            }
            if (brokerStats.getLeaderReplicaStats() != null) {
                ConcurrentMap<TopicPartition, Histogram> concurrentMap = bytesInStats.get(zkUrl);
                ConcurrentMap<TopicPartition, Histogram> concurrentMap2 = bytesOutStats.get(zkUrl);
                for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
                    if (replicaStat.getInReassignment().booleanValue()) {
                        updateReplicaReassignmentTimestamp(zkUrl, replicaStat);
                    } else {
                        TopicPartition topicPartition = new TopicPartition(replicaStat.getTopic(), replicaStat.getPartition().intValue());
                        if (brokerStats.getTimestamp().longValue() - getLastReplicaReassignmentTimestamp(zkUrl, topicPartition) >= REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
                            if (!concurrentMap.containsKey(topicPartition)) {
                                concurrentMap.putIfAbsent(topicPartition, new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
                            }
                            if (!concurrentMap2.containsKey(topicPartition)) {
                                concurrentMap2.putIfAbsent(topicPartition, new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
                            }
                            concurrentMap.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate().longValue());
                            concurrentMap2.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate().longValue());
                        }
                    }
                }
            }
        }
    }

    public static long getMaxBytesIn(String str, TopicPartition topicPartition) {
        try {
            return bytesInStats.get(str).get(topicPartition).getSnapshot().getMax();
        } catch (Exception e) {
            LOG.error("Failed to get bytesinfo for {}:{}", str, topicPartition);
            throw e;
        }
    }

    public static double get99thPercentilBytesIn(String str, TopicPartition topicPartition) {
        return bytesInStats.get(str).get(topicPartition).getSnapshot().get99thPercentile();
    }

    public static long getMaxBytesOut(String str, TopicPartition topicPartition) {
        return bytesOutStats.get(str).get(topicPartition).getSnapshot().getMax();
    }

    public static double get99thPercentilBytesOut(String str, TopicPartition topicPartition) {
        return bytesOutStats.get(str).get(topicPartition).getSnapshot().get99thPercentile();
    }

    public static Map<TopicPartition, Histogram> getTopicsBytesInStats(String str) {
        return bytesInStats.get(str);
    }

    public static Map<TopicPartition, Histogram> getTopicsBytesOutStats(String str) {
        return bytesOutStats.get(str);
    }

    public static Map<TopicPartition, Long> getProcessingStartOffsets(KafkaConsumer kafkaConsumer, String str, long j) {
        List partitionsFor = kafkaConsumer.partitionsFor(str);
        LOG.info("Get partition info for {} : {} partitions", str, Integer.valueOf(partitionsFor.size()));
        List<TopicPartition> list = (List) partitionsFor.stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
        Map endOffsets = kafkaConsumer.endOffsets(list);
        Map beginningOffsets = kafkaConsumer.beginningOffsets(list);
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : list) {
            kafkaConsumer.unsubscribe();
            LOG.info("assigning {} to kafkaconsumer", topicPartition);
            ArrayList arrayList = new ArrayList();
            arrayList.add(topicPartition);
            kafkaConsumer.assign(arrayList);
            long longValue = ((Long) endOffsets.get(topicPartition)).longValue();
            long longValue2 = ((Long) beginningOffsets.get(topicPartition)).longValue();
            long max = Math.max(((Long) endOffsets.get(topicPartition)).longValue() - 10, longValue2);
            BrokerStats deserializeBrokerStats = OperatorUtil.deserializeBrokerStats(retrieveOneMessage(kafkaConsumer, topicPartition, max));
            if (deserializeBrokerStats != null) {
                long longValue3 = deserializeBrokerStats.getTimestamp().longValue();
                while (longValue3 > j) {
                    max = Math.max(longValue2, max - 5000);
                    BrokerStats deserializeBrokerStats2 = OperatorUtil.deserializeBrokerStats(retrieveOneMessage(kafkaConsumer, topicPartition, max));
                    if (deserializeBrokerStats2 == null) {
                        break;
                    }
                    longValue3 = deserializeBrokerStats2.getTimestamp().longValue();
                }
            }
            hashMap.put(topicPartition, Long.valueOf(max));
            LOG.info("{}: offset = {}, endOffset = {}, # of to-be-processed messages = {}", topicPartition, Long.valueOf(max), Long.valueOf(longValue), Long.valueOf(longValue - max));
        }
        return hashMap;
    }

    private static ConsumerRecord<byte[], byte[]> retrieveOneMessage(KafkaConsumer kafkaConsumer, TopicPartition topicPartition, long j) {
        kafkaConsumer.seek(topicPartition, j);
        ConsumerRecord<byte[], byte[]> consumerRecord = null;
        while (true) {
            if (0 != 0) {
                break;
            }
            ConsumerRecords poll = kafkaConsumer.poll(100L);
            if (!poll.isEmpty()) {
                LOG.debug("records.count() = {}", Integer.valueOf(poll.count()));
                List records = poll.records(topicPartition);
                if (records != null && !records.isEmpty()) {
                    consumerRecord = (ConsumerRecord) records.get(0);
                    break;
                }
                LOG.info("recList is null or empty");
            }
        }
        return consumerRecord;
    }

    public static void readPastReplicaStats(String str, SecurityProtocol securityProtocol, String str2, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(str, "org.apache.kafka.common.serialization.ByteArrayDeserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer", 1, securityProtocol, (Map) null);
        Map<TopicPartition, Long> processingStartOffsets = getProcessingStartOffsets(kafkaConsumer, str2, System.currentTimeMillis() - (j * 1000));
        kafkaConsumer.unsubscribe();
        kafkaConsumer.assign(processingStartOffsets.keySet());
        Map endOffsets = kafkaConsumer.endOffsets(processingStartOffsets.keySet());
        KafkaUtils.closeConsumer(str);
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : endOffsets.keySet()) {
            PastReplicaStatsProcessor pastReplicaStatsProcessor = new PastReplicaStatsProcessor(str, securityProtocol, topicPartition, processingStartOffsets.get(topicPartition).longValue(), ((Long) endOffsets.get(topicPartition)).longValue());
            arrayList.add(pastReplicaStatsProcessor);
            pastReplicaStatsProcessor.start();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((PastReplicaStatsProcessor) it.next()).join();
            } catch (InterruptedException e) {
                LOG.error("ReplicaStatsProcessor is interrupted.", e);
            }
        }
        LOG.info("ReplicaStats bootstrap time : {}", Double.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000.0d));
    }

    static {
        $assertionsDisabled = !ReplicaStatsManager.class.desiredAssertionStatus();
        LOG = LogManager.getLogger(ReplicaStatsManager.class);
        bytesInStats = new ConcurrentHashMap();
        bytesOutStats = new ConcurrentHashMap();
        clusters = new ConcurrentHashMap();
        config = null;
        replicaReassignmentTimestamps = new ConcurrentHashMap<>();
    }
}
