package com.pinterest.doctorkafka.replicastats;

import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/replicastats/PastReplicaStatsProcessor.class */
public class PastReplicaStatsProcessor implements Runnable {
    private static final Logger LOG = LogManager.getLogger(PastReplicaStatsProcessor.class);
    private String zkUrl;
    private TopicPartition topicPartition;
    private SecurityProtocol securityProtocol;
    private long startOffset;
    private long endOffset;
    private Thread thread;

    public PastReplicaStatsProcessor(String str, SecurityProtocol securityProtocol, TopicPartition topicPartition, long j, long j2) {
        this.zkUrl = str;
        this.securityProtocol = securityProtocol;
        this.topicPartition = topicPartition;
        this.startOffset = j;
        this.endOffset = j2;
    }

    public void start() {
        this.thread = new Thread(this);
        this.thread.start();
    }

    public void join() throws InterruptedException {
        this.thread.join();
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = null;
        try {
            try {
                String brokers = KafkaUtils.getBrokers(this.zkUrl, this.securityProtocol);
                LOG.info("ZkUrl: {}, Brokers: {}", this.zkUrl, brokers);
                Properties properties = new Properties();
                properties.put("bootstrap.servers", brokers);
                properties.put("enable.auto.commit", "false");
                properties.put("group.id", "doctorkafka_" + this.topicPartition);
                properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                properties.put("max.poll.records", 2000);
                properties.put("max.partition.fetch.bytes", 4194304);
                kafkaConsumer = new KafkaConsumer(properties);
                HashSet hashSet = new HashSet();
                hashSet.add(this.topicPartition);
                kafkaConsumer.assign(hashSet);
                kafkaConsumer.seek(this.topicPartition, this.startOffset);
                while (kafkaConsumer.position(this.topicPartition) < this.endOffset) {
                    Iterator it = kafkaConsumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        BrokerStats deserializeBrokerStats = OperatorUtil.deserializeBrokerStats((ConsumerRecord) it.next());
                        if (deserializeBrokerStats != null && deserializeBrokerStats.getName() != null) {
                            ReplicaStatsManager.update(deserializeBrokerStats);
                        }
                    }
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            } catch (Exception e) {
                LOG.error("Exception in processing brokerstats", e);
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            }
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }
}
