package com.pinterest.doctorkafka.tools;

import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/tools/BrokerStatsFilter.class */
public class BrokerStatsFilter {
    private static final String CONFIG = "config";
    private static final String BROKERSTATS_ZOOKEEPER = "brokerstatszk";
    private static final String BROKERSTATS_TOPIC = "brokerstatstopic";
    private static final String BROKERNAME = "broker";
    private static final Logger LOG = LogManager.getLogger(BrokerStatsFilter.class);
    private static final Options options = new Options();

    private static CommandLine parseCommandLine(String[] strArr) {
        Option option = new Option(CONFIG, true, "operator config");
        Option option2 = new Option(BROKERSTATS_ZOOKEEPER, true, "zookeeper for brokerstats topic");
        options.addOption(option).addOption(option2).addOption(new Option(BROKERSTATS_TOPIC, true, "topic for brokerstats")).addOption(new Option(BROKERNAME, true, "broker name"));
        if (strArr.length < 6) {
            printUsageAndExit();
        }
        CommandLine commandLine = null;
        try {
            commandLine = new DefaultParser().parse(options, strArr);
        } catch (ParseException | NumberFormatException e) {
            printUsageAndExit();
        }
        return commandLine;
    }

    private static void printUsageAndExit() {
        new HelpFormatter().printHelp("ClusterLoadBalancer", options);
        System.exit(1);
    }

    public static List<BrokerStats> processOnePartition(String str, TopicPartition topicPartition, long j, long j2, Set<String> set) {
        KafkaConsumer kafkaConsumer = null;
        ArrayList arrayList = new ArrayList();
        try {
            try {
                String brokers = KafkaUtils.getBrokers(str, SecurityProtocol.PLAINTEXT);
                LOG.info("ZkUrl: {}, Brokers: {}", str, brokers);
                Properties properties = new Properties();
                properties.put("bootstrap.servers", brokers);
                properties.put("enable.auto.commit", "false");
                properties.put("group.id", "kafka_operator" + topicPartition);
                properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                properties.put("max.poll.records", 2000);
                properties.put("max.partition.fetch.bytes", 4194304);
                kafkaConsumer = new KafkaConsumer(properties);
                HashSet hashSet = new HashSet();
                hashSet.add(topicPartition);
                kafkaConsumer.assign(hashSet);
                kafkaConsumer.seek(topicPartition, j);
                while (kafkaConsumer.position(topicPartition) < j2) {
                    Iterator it = kafkaConsumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        BrokerStats deserializeBrokerStats = OperatorUtil.deserializeBrokerStats((ConsumerRecord) it.next());
                        if (deserializeBrokerStats != null && deserializeBrokerStats.getName() != null) {
                            if (set.contains(deserializeBrokerStats.getName())) {
                                arrayList.add(deserializeBrokerStats);
                            }
                        }
                    }
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            } catch (Exception e) {
                LOG.error("Exception in processing brokerstats", e);
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            }
            return arrayList;
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    public static void main(String[] strArr) throws Exception {
        CommandLine parseCommandLine = parseCommandLine(strArr);
        String optionValue = parseCommandLine.getOptionValue(BROKERSTATS_ZOOKEEPER);
        String optionValue2 = parseCommandLine.getOptionValue(BROKERSTATS_TOPIC);
        String optionValue3 = parseCommandLine.getOptionValue(BROKERNAME);
        HashSet hashSet = new HashSet();
        hashSet.add(optionValue3);
        KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(optionValue, "org.apache.kafka.common.serialization.ByteArrayDeserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer", 1, SecurityProtocol.PLAINTEXT, (Map) null);
        Map<TopicPartition, Long> processingStartOffsets = ReplicaStatsManager.getProcessingStartOffsets(kafkaConsumer, optionValue2, System.currentTimeMillis() - 86400000);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.assign(processingStartOffsets.keySet());
        Map endOffsets = kafkaConsumer.endOffsets(processingStartOffsets.keySet());
        KafkaUtils.closeConsumer(optionValue);
        TreeMap treeMap = new TreeMap();
        for (TopicPartition topicPartition : processingStartOffsets.keySet()) {
            LOG.info("Start processing {}", topicPartition);
            List<BrokerStats> processOnePartition = processOnePartition(optionValue, topicPartition, processingStartOffsets.get(topicPartition).longValue(), ((Long) endOffsets.get(topicPartition)).longValue(), hashSet);
            for (BrokerStats brokerStats : processOnePartition) {
                treeMap.put(brokerStats.getTimestamp(), brokerStats);
            }
            LOG.info("Finished processing {}, retrieved {} records", topicPartition, Integer.valueOf(processOnePartition.size()));
        }
        for (Map.Entry entry : treeMap.entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue());
        }
    }
}
