package org.apache.kylin.source.kafka.diagnose;

import com.google.common.collect.Lists;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.message.MessageAndOffset;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.StreamingParser;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaRequester;
import org.apache.kylin.source.kafka.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-source-kafka-1.5.2.1.jar:org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.class */
public class KafkaInputAnalyzer extends AbstractApplication {
    private static final Option OPTION_STREAMING;
    private static final Option OPTION_TASK;
    private static final Option OPTION_TSCOLNAME;
    private static final Logger logger;
    private StreamingParser parser;
    private KafkaConfig kafkaConfig;
    private Options options = new Options();

    /* loaded from: input_file:WEB-INF/lib/kylin-source-kafka-1.5.2.1.jar:org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer$KafkaMessagePuller.class */
    public class KafkaMessagePuller implements Runnable {
        private final String topic;
        private final int partitionId;
        private final KafkaClusterConfig streamingConfig;
        private final LinkedBlockingQueue<StreamingMessage> streamQueue = new LinkedBlockingQueue<>(10000);
        private final StreamingParser streamingParser;
        private final Broker leadBroker;
        private long offset;
        protected final Logger logger;

        public KafkaMessagePuller(int i, String str, int i2, long j, Broker broker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
            this.topic = str;
            this.partitionId = i2;
            this.streamingConfig = kafkaClusterConfig;
            this.offset = j;
            this.logger = LoggerFactory.getLogger(str + "_cluster_" + i + "_" + i2);
            this.streamingParser = streamingParser;
            this.leadBroker = broker;
        }

        public BlockingQueue<StreamingMessage> getStreamQueue() {
            return this.streamQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            int i2 = 0;
            while (true) {
                try {
                    int i3 = i;
                    i2++;
                    this.logger.info("fetching topic {} partition id {} offset {} leader {}", (Object[]) new String[]{this.topic, String.valueOf(this.partitionId), String.valueOf(this.offset), this.leadBroker.toString()});
                    FetchResponse fetchResponse = KafkaRequester.fetchResponse(this.topic, this.partitionId, this.offset, this.leadBroker, this.streamingConfig);
                    if (fetchResponse.errorCode(this.topic, this.partitionId) != 0) {
                        this.logger.warn("fetch response offset:" + this.offset + " errorCode:" + ((int) fetchResponse.errorCode(this.topic, this.partitionId)));
                        Thread.sleep(30000L);
                    } else {
                        Iterator it2 = fetchResponse.messageSet(this.topic, this.partitionId).iterator();
                        while (it2.hasNext()) {
                            MessageAndOffset messageAndOffset = (MessageAndOffset) it2.next();
                            this.offset++;
                            i++;
                            StreamingMessage parse = this.streamingParser.parse(messageAndOffset);
                            if (this.streamingParser.filter(parse)) {
                                this.streamQueue.add(parse);
                            }
                        }
                        this.logger.info("Number of messages consumed: " + i + " offset is: " + this.offset + " total fetch round: " + i2);
                        if (i == i3) {
                            Thread.sleep(30000L);
                        }
                    }
                } catch (Exception e) {
                    this.logger.error("consumer has encountered an error", (Throwable) e);
                    return;
                }
            }
        }
    }

    public KafkaInputAnalyzer() {
        this.options.addOption(OPTION_STREAMING);
        this.options.addOption(OPTION_TASK);
        this.options.addOption(OPTION_TSCOLNAME);
    }

    private List<BlockingQueue<StreamingMessage>> consume(int i, KafkaClusterConfig kafkaClusterConfig, int i2, long j) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i3 = 0; i3 < i2; i3++) {
            Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, i3);
            long lastOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), i3, j, leadBroker, kafkaClusterConfig);
            logger.info("starting offset:" + lastOffset + " cluster id:" + i + " partitionId:" + i3);
            KafkaMessagePuller kafkaMessagePuller = new KafkaMessagePuller(i, kafkaClusterConfig.getTopic(), i3, lastOffset, leadBroker, kafkaClusterConfig, this.parser);
            Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(kafkaMessagePuller);
            newArrayList.add(kafkaMessagePuller.getStreamQueue());
        }
        return newArrayList;
    }

    private List<BlockingQueue<StreamingMessage>> consumeAll(long j) {
        int i = 0;
        LinkedList newLinkedList = Lists.newLinkedList();
        for (KafkaClusterConfig kafkaClusterConfig : this.kafkaConfig.getKafkaClusterConfigs()) {
            List<BlockingQueue<StreamingMessage>> consume = consume(i, kafkaClusterConfig, KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(), j);
            newLinkedList.addAll(consume);
            logger.info("Cluster {} with {} partitions", Integer.valueOf(i), Integer.valueOf(consume.size()));
            i++;
        }
        return newLinkedList;
    }

    private void analyzeLatency() throws InterruptedException {
        long[] jArr = {1, 5, 60, 300, 1800};
        final List<BlockingQueue<StreamingMessage>> consumeAll = consumeAll(OffsetRequest.LatestTime());
        final ArrayList newArrayList = Lists.newArrayList();
        final TimeHistogram timeHistogram = new TimeHistogram(jArr, "overall");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(consumeAll.size(), new DaemonThreadFactory());
        for (int i = 0; i < consumeAll.size(); i++) {
            final int i2 = i;
            newArrayList.add(new TimeHistogram(jArr, "" + i));
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.kylin.source.kafka.diagnose.KafkaInputAnalyzer.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            long timestamp = ((StreamingMessage) ((BlockingQueue) consumeAll.get(i2)).take()).getTimestamp();
                            ((TimeHistogram) newArrayList.get(i2)).processMillis(System.currentTimeMillis() - timestamp);
                            timeHistogram.processMillis(System.currentTimeMillis() - timestamp);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        while (true) {
            System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
            Iterator it2 = newArrayList.iterator();
            while (it2.hasNext()) {
                ((TimeHistogram) it2.next()).printStatus();
            }
            timeHistogram.printStatus();
            Thread.sleep(300000L);
        }
    }

    private void analyzeDisorder() throws InterruptedException {
        final List<BlockingQueue<StreamingMessage>> consumeAll = consumeAll(OffsetRequest.EarliestTime());
        final ArrayList newArrayList = Lists.newArrayList();
        final ArrayList newArrayList2 = Lists.newArrayList();
        final ArrayList newArrayList3 = Lists.newArrayList();
        final ArrayList newArrayList4 = Lists.newArrayList();
        final ArrayList newArrayList5 = Lists.newArrayList();
        for (int i = 0; i < consumeAll.size(); i++) {
            newArrayList.add(0L);
            newArrayList2.add(0L);
            newArrayList3.add(0L);
            newArrayList4.add(0L);
            newArrayList5.add(0L);
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(consumeAll.size(), new DaemonThreadFactory());
        final CountDownLatch countDownLatch = new CountDownLatch(consumeAll.size());
        for (int i2 = 0; i2 < consumeAll.size(); i2++) {
            final int i3 = i2;
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.kylin.source.kafka.diagnose.KafkaInputAnalyzer.2
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            StreamingMessage streamingMessage = (StreamingMessage) ((BlockingQueue) consumeAll.get(i3)).poll(60L, TimeUnit.SECONDS);
                            if (streamingMessage == null) {
                                System.out.println(String.format("Thread %d is exiting", Integer.valueOf(i3)));
                                System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d", Integer.valueOf(i3), newArrayList5.get(i3), newArrayList3.get(i3), newArrayList4.get(i3)));
                                countDownLatch.countDown();
                                return;
                            }
                            long timestamp = streamingMessage.getTimestamp();
                            long offset = streamingMessage.getOffset();
                            if (timestamp < ((Long) newArrayList.get(i3)).longValue()) {
                                newArrayList3.set(i3, Long.valueOf(Math.max(((Long) newArrayList.get(i3)).longValue() - timestamp, ((Long) newArrayList3.get(i3)).longValue())));
                                newArrayList4.set(i3, Long.valueOf(Math.max(offset - ((Long) newArrayList2.get(i3)).longValue(), ((Long) newArrayList4.get(i3)).longValue())));
                            } else {
                                newArrayList.set(i3, Long.valueOf(timestamp));
                                newArrayList2.set(i3, Long.valueOf(offset));
                            }
                            newArrayList5.set(i3, Long.valueOf(((Long) newArrayList5.get(i3)).longValue() + 1));
                            if (((Long) newArrayList5.get(i3)).longValue() % 10000 == 1) {
                                System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d", Integer.valueOf(i3), newArrayList5.get(i3), newArrayList3.get(i3), newArrayList4.get(i3)));
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            return;
                        }
                    }
                }
            });
        }
        countDownLatch.await();
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected Options getOptions() {
        return this.options;
    }

    @Override // org.apache.kylin.common.util.AbstractApplication
    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_STREAMING);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_TASK);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
        this.kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(optionValue);
        this.parser = new TimedJsonStreamParser(Lists.newArrayList(), "formatTs=true;tsColName=" + optionValue3);
        if ("disorder".equalsIgnoreCase(optionValue2)) {
            analyzeDisorder();
        } else if ("delay".equalsIgnoreCase(optionValue2)) {
            analyzeLatency();
        } else {
            optionsHelper.printUsage(getClass().getName(), this.options);
        }
    }

    public static void main(String[] strArr) {
        new KafkaInputAnalyzer().execute(strArr);
    }

    static {
        OptionBuilder.withArgName("streaming");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Name of the streaming");
        OPTION_STREAMING = OptionBuilder.create("streaming");
        OptionBuilder.withArgName("task");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("get delay or get disorder degree");
        OPTION_TASK = OptionBuilder.create("task");
        OptionBuilder.withArgName("tsColName");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("field name of the ts");
        OPTION_TSCOLNAME = OptionBuilder.create("tsColName");
        logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
    }
}
