package com.pinterest.doctorkafka.tools;

import com.google.common.collect.Lists;
import com.pinterest.doctorkafka.OperatorAction;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/tools/DoctorKafkaActionRetriever.class */
public class DoctorKafkaActionRetriever {
    private static final String ZOOKEEPER = "zookeeper";
    private static final String TOPIC = "topic";
    private static final String NUM_MESSAGES = "num_messages";
    private static final Logger LOG = LogManager.getLogger(DoctorKafkaActionRetriever.class);
    private static final DecoderFactory avroDecoderFactory = DecoderFactory.get();
    private static Schema operatorActionSchema = OperatorAction.getClassSchema();
    private static final Options options = new Options();

    private static CommandLine parseCommandLine(String[] strArr) {
        Option option = new Option(ZOOKEEPER, true, "doctorkafka action zookeeper");
        options.addOption(option).addOption(new Option(TOPIC, true, "doctorkafka action topic")).addOption(new Option(NUM_MESSAGES, true, "num of messages to retrieve"));
        if (strArr.length < 2) {
            printUsageAndExit();
        }
        CommandLine commandLine = null;
        try {
            commandLine = new DefaultParser().parse(options, strArr);
        } catch (ParseException | NumberFormatException e) {
            printUsageAndExit();
        }
        return commandLine;
    }

    private static void printUsageAndExit() {
        new HelpFormatter().printHelp("OperatorActionRetriever", options);
        System.exit(1);
    }

    public static void main(String[] strArr) throws Exception {
        CommandLine parseCommandLine = parseCommandLine(strArr);
        String optionValue = parseCommandLine.getOptionValue(ZOOKEEPER);
        String optionValue2 = parseCommandLine.getOptionValue(TOPIC);
        int parseInt = Integer.parseInt(parseCommandLine.getOptionValue(NUM_MESSAGES));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(OperatorUtil.createKafkaConsumerProperties(optionValue, "operator_action_commandline", SecurityProtocol.PLAINTEXT, (Map) null));
        TopicPartition topicPartition = new TopicPartition(optionValue2, 0);
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicPartition);
        kafkaConsumer.assign(arrayList);
        Map beginningOffsets = kafkaConsumer.beginningOffsets(arrayList);
        Map endOffsets = kafkaConsumer.endOffsets(arrayList);
        for (TopicPartition topicPartition2 : endOffsets.keySet()) {
            LOG.info("{} : offsets [{}, {}], num messages : {}", topicPartition2, beginningOffsets.get(topicPartition2), endOffsets.get(topicPartition2), Long.valueOf(((Long) endOffsets.get(topicPartition2)).longValue() - ((Long) beginningOffsets.get(topicPartition2)).longValue()));
            kafkaConsumer.seek(topicPartition2, Math.max(((Long) beginningOffsets.get(topicPartition2)).longValue(), ((Long) endOffsets.get(topicPartition2)).longValue() - parseInt));
        }
        ArrayList arrayList2 = new ArrayList();
        for (ConsumerRecords poll = kafkaConsumer.poll(100L); !poll.isEmpty(); poll = kafkaConsumer.poll(100L)) {
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                arrayList2.add((ConsumerRecord) it.next());
            }
            Iterator it2 = Lists.reverse(arrayList2).iterator();
            while (it2.hasNext()) {
                try {
                    BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder((byte[]) ((ConsumerRecord) it2.next()).value(), (BinaryDecoder) null);
                    SpecificDatumReader specificDatumReader = new SpecificDatumReader(operatorActionSchema);
                    OperatorAction operatorAction = new OperatorAction();
                    specificDatumReader.read(operatorAction, binaryDecoder);
                    System.out.println(new Date(operatorAction.getTimestamp().longValue()).toString() + " : " + operatorAction);
                } catch (Exception e) {
                    LOG.info("Fail to decode an message", e);
                }
            }
        }
        kafkaConsumer.close();
    }
}
