package com.pinterest.doctorkafka.stats;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.pinterest.doctorkafka.util.OperatorUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/stats/KafkaStatsMain.class */
public class KafkaStatsMain {
    private static final String DEFAULT_PRIMARY_INTERFACE_NAME = "eth0";
    private static final String PRIMARY_INTERFACE_NAME = "primary_network_ifacename";
    private static final String BROKER_NAME = "broker";
    private static final String JMX_PORT = "jmxport";
    private static final String ZOOKEEPER = "zookeeper";
    private static final String METRICS_TOPIC = "topic";
    private static final String OSTRICH_PORT = "ostrichport";
    private static final String TSD_HOSTPORT = "tsdhostport";
    private static final String UPTIME_IN_SECONDS = "uptimeinseconds";
    private static final String POLLING_INTERVAL = "pollingintervalinseconds";
    private static final String KAFKA_CONFIG = "kafka_config";
    private static final String DISABLE_EC2METADATA = "disable_ec2metadata";
    private static final String STATS_PRODUCER_CONFIG = "producer_config";
    private static final Logger LOG = LogManager.getLogger((Class<?>) KafkaStatsMain.class);
    protected static final String hostName = OperatorUtil.getHostname();
    private static final Options options = new Options();
    private static final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private static BrokerStatsReporter brokerStatsReporter = null;
    private static CollectorMonitor collectorMonitor = null;
    private static KafkaAvroPublisher avroPublisher = null;

    /* loaded from: input_file:com/pinterest/doctorkafka/stats/KafkaStatsMain$CollectorMonitor.class */
    public static class CollectorMonitor implements Runnable {
        private static final int INITIAL_DELAY = 0;
        private long restartTime;
        private static final Logger LOG = LogManager.getLogger((Class<?>) CollectorMonitor.class);
        public static ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Monitor").build());

        public CollectorMonitor(long j) {
            this.restartTime = System.currentTimeMillis() + (j * 1000);
        }

        public void start() {
            monitorExecutor.scheduleAtFixedRate(this, 0L, 15L, TimeUnit.SECONDS);
        }

        public void stop() throws Exception {
            monitorExecutor.shutdown();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (System.currentTimeMillis() > this.restartTime) {
                LOG.warn("Restarting metrics collector");
                System.exit(0);
            }
        }
    }

    /* loaded from: input_file:com/pinterest/doctorkafka/stats/KafkaStatsMain$KafkaStatsCleanupThread.class */
    static class KafkaStatsCleanupThread extends Thread {
        KafkaStatsCleanupThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                if (KafkaStatsMain.brokerStatsReporter != null) {
                    KafkaStatsMain.brokerStatsReporter.stop();
                }
            } catch (Throwable th) {
                KafkaStatsMain.LOG.error("Shutdown failure in brokerStatsReporter : ", th);
            }
            try {
                if (KafkaStatsMain.collectorMonitor != null) {
                    KafkaStatsMain.collectorMonitor.stop();
                }
            } catch (Throwable th2) {
                KafkaStatsMain.LOG.error("Shutdown failure in collectorMonitor : ", th2);
            }
            try {
                if (KafkaStatsMain.avroPublisher != null) {
                    KafkaStatsMain.avroPublisher.close();
                }
            } catch (Throwable th3) {
                KafkaStatsMain.LOG.error("Shutdown failure in avroPublisher : ", th3);
            }
            KafkaStatsMain.shutdownLatch.countDown();
        }
    }

    private static CommandLine parseCommandLine(String[] strArr) {
        Option option = new Option(BROKER_NAME, true, "kafka broker");
        option.setRequired(false);
        Option option2 = new Option(JMX_PORT, true, "kafka jmx port number");
        option2.setArgName("kafka jmx port number");
        Option option3 = new Option(ZOOKEEPER, true, "zk url for metrics topic");
        Option option4 = new Option("topic", true, "kafka topic for metric messages");
        Option option5 = new Option(TSD_HOSTPORT, true, "tsd host and port, e.g. localhost:18621");
        Option option6 = new Option(OSTRICH_PORT, true, "ostrich port");
        Option option7 = new Option(UPTIME_IN_SECONDS, true, "uptime in seconds");
        Option option8 = new Option(POLLING_INTERVAL, true, "polling interval in seconds");
        Option option9 = new Option(KAFKA_CONFIG, true, "kafka server properties file path");
        options.addOption(option2).addOption(option).addOption(option3).addOption(option4).addOption(option5).addOption(option6).addOption(option7).addOption(option8).addOption(option9).addOption(new Option(STATS_PRODUCER_CONFIG, true, "kafka_stats producer config")).addOption(new Option(PRIMARY_INTERFACE_NAME, true, "network interface used by kafka")).addOption(new Option(DISABLE_EC2METADATA, false, "Disable collecting host information via ec2metadata"));
        if (strArr.length < 6) {
            printUsageAndExit();
        }
        CommandLine commandLine = null;
        try {
            commandLine = new DefaultParser().parse(options, strArr);
        } catch (NumberFormatException | ParseException e) {
            printUsageAndExit();
        }
        return commandLine;
    }

    private static void printUsageAndExit() {
        new HelpFormatter().printHelp("KafkaMetricsCollector", options);
        System.exit(1);
    }

    public static void main(String[] strArr) throws Exception {
        Runtime.getRuntime().addShutdownHook(new KafkaStatsCleanupThread());
        CommandLine parseCommandLine = parseCommandLine(strArr);
        String optionValue = parseCommandLine.getOptionValue(BROKER_NAME);
        if (optionValue == null || optionValue.isEmpty()) {
            optionValue = hostName;
        }
        String optionValue2 = parseCommandLine.getOptionValue(JMX_PORT);
        String optionValue3 = parseCommandLine.getOptionValue(OSTRICH_PORT);
        String optionValue4 = parseCommandLine.getOptionValue(TSD_HOSTPORT);
        String optionValue5 = parseCommandLine.getOptionValue(ZOOKEEPER);
        String optionValue6 = parseCommandLine.getOptionValue("topic");
        String optionValue7 = parseCommandLine.getOptionValue(KAFKA_CONFIG);
        long parseLong = Long.parseLong(parseCommandLine.getOptionValue(UPTIME_IN_SECONDS));
        long parseLong2 = Long.parseLong(parseCommandLine.getOptionValue(POLLING_INTERVAL));
        String optionValue8 = parseCommandLine.getOptionValue(PRIMARY_INTERFACE_NAME, DEFAULT_PRIMARY_INTERFACE_NAME);
        boolean hasOption = parseCommandLine.hasOption(DISABLE_EC2METADATA);
        String str = null;
        if (parseCommandLine.hasOption(STATS_PRODUCER_CONFIG)) {
            str = parseCommandLine.getOptionValue(STATS_PRODUCER_CONFIG);
        }
        avroPublisher = new KafkaAvroPublisher(optionValue5, optionValue6, str);
        brokerStatsReporter = new BrokerStatsReporter(optionValue7, optionValue, optionValue2, avroPublisher, parseLong2, optionValue8, hasOption);
        brokerStatsReporter.start();
        collectorMonitor = new CollectorMonitor(parseLong);
        collectorMonitor.start();
        if (optionValue4 == null && optionValue3 == null) {
            LOG.info("OpenTSDB and Ostrich options missing, not starting Ostrich service");
        } else {
            OperatorUtil.startOstrichService("kafkastats", optionValue4, Integer.parseInt(optionValue3));
        }
        shutdownLatch.await(10L, TimeUnit.SECONDS);
    }
}
