package com.pinterest.doctorkafka.replicastats;

import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.DoctorKafkaMetrics;
import com.pinterest.doctorkafka.util.OpenTsdbMetricConverter;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.lang.Thread;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/replicastats/BrokerStatsProcessor.class */
public class BrokerStatsProcessor implements Runnable {
    private static final long BROKER_STATS_POLL_INTERVAL_MS = 200;
    protected Thread thread;
    private boolean stopped = true;
    private String zkUrl;
    private String topic;
    private SecurityProtocol securityProtocol;
    private Map<String, String> consumerConfigs;
    private static final Logger LOG = LogManager.getLogger((Class<?>) BrokerStatsProcessor.class);
    private static final String BROKERSTATS_CONSUMER_GROUP = "operator_brokerstats_group_" + OperatorUtil.getHostname();

    /* loaded from: input_file:com/pinterest/doctorkafka/replicastats/BrokerStatsProcessor$BrokerStatsReaderExceptionHandler.class */
    class BrokerStatsReaderExceptionHandler implements Thread.UncaughtExceptionHandler {
        BrokerStatsReaderExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            BrokerStatsProcessor.LOG.error("Unexpected exception : ", th);
            System.exit(1);
        }
    }

    public BrokerStatsProcessor(String str, SecurityProtocol securityProtocol, String str2, Map<String, String> map) {
        this.zkUrl = str;
        this.topic = str2;
        this.securityProtocol = securityProtocol;
        this.consumerConfigs = map;
    }

    public void start() {
        this.thread = new Thread(this);
        this.thread.start();
    }

    public void stop() {
        this.stopped = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        BrokerStats deserializeBrokerStats;
        this.thread.setUncaughtExceptionHandler(new BrokerStatsReaderExceptionHandler());
        this.stopped = false;
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(OperatorUtil.createKafkaConsumerProperties(this.zkUrl, BROKERSTATS_CONSUMER_GROUP, this.securityProtocol, this.consumerConfigs));
            kafkaConsumer.subscribe(Arrays.asList(this.topic));
            while (!this.stopped) {
                Iterator it = kafkaConsumer.poll(BROKER_STATS_POLL_INTERVAL_MS).iterator();
                while (it.hasNext()) {
                    try {
                        deserializeBrokerStats = OperatorUtil.deserializeBrokerStats((ConsumerRecord) it.next());
                    } catch (Exception e) {
                        LOG.debug("Fail to decode an message", (Throwable) e);
                    }
                    if (deserializeBrokerStats != null && deserializeBrokerStats.getName() != null) {
                        ReplicaStatsManager.update(deserializeBrokerStats);
                        OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.BROKERSTATS_MESSAGES, 1, "zkUrl= " + deserializeBrokerStats.getZkUrl());
                    }
                }
            }
        } catch (Exception e2) {
            LOG.error("Caught exception in getting broker stats, exiting. ", (Throwable) e2);
            System.exit(-1);
        }
    }
}
