package org.apache.kylin.source.kafka;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.streaming.IStreamingInput;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaRequester;
import org.apache.kylin.source.kafka.util.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-source-kafka-1.5.2.1.jar:org/apache/kylin/source/kafka/KafkaStreamingInput.class */
public class KafkaStreamingInput implements IStreamingInput {
    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);

    /* loaded from: input_file:WEB-INF/lib/kylin-source-kafka-1.5.2.1.jar:org/apache/kylin/source/kafka/KafkaStreamingInput$StreamingMessageProducer.class */
    private static class StreamingMessageProducer implements Callable<List<StreamingMessage>> {
        private final KafkaClusterConfig kafkaClusterConfig;
        private final int partitionId;
        private final StreamingParser streamingParser;
        private final Pair<Long, Long> timeRange;
        private final long margin;
        private List<Broker> replicaBrokers;

        StreamingMessageProducer(KafkaClusterConfig kafkaClusterConfig, int i, Pair<Long, Long> pair, long j, StreamingParser streamingParser) {
            this.kafkaClusterConfig = kafkaClusterConfig;
            this.partitionId = i;
            this.streamingParser = streamingParser;
            this.margin = j;
            this.timeRange = pair;
            this.replicaBrokers = kafkaClusterConfig.getBrokers();
        }

        private Broker getLeadBroker() {
            PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(this.kafkaClusterConfig.getTopic(), this.partitionId, this.replicaBrokers, this.kafkaClusterConfig);
            if (partitionMetadata == null) {
                return null;
            }
            if (partitionMetadata.errorCode() != 0) {
                KafkaStreamingInput.logger.warn("PartitionMetadata errorCode: " + ((int) partitionMetadata.errorCode()));
            }
            this.replicaBrokers = partitionMetadata.replicas();
            return partitionMetadata.leader();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public List<StreamingMessage> call() throws Exception {
            LinkedList newLinkedList = Lists.newLinkedList();
            try {
                long findClosestOffsetWithDataTimestamp = KafkaUtils.findClosestOffsetWithDataTimestamp(this.kafkaClusterConfig, this.partitionId, this.timeRange.getFirst().longValue() - this.margin, this.streamingParser);
                int i = 0;
                int i2 = 0;
                Broker broker = null;
                String topic = this.kafkaClusterConfig.getTopic();
                while (true) {
                    boolean z = false;
                    int i3 = i2;
                    i++;
                    if (broker == null) {
                        broker = getLeadBroker();
                    }
                    if (broker == null) {
                        KafkaStreamingInput.logger.warn("cannot find lead broker, wait 5s");
                        Thread.sleep(5000L);
                    } else {
                        KafkaStreamingInput.logger.info("fetching topic {} partition id {} offset {} leader {}", (Object[]) new String[]{topic, String.valueOf(this.partitionId), String.valueOf(findClosestOffsetWithDataTimestamp), broker.toString()});
                        FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, this.partitionId, findClosestOffsetWithDataTimestamp, broker, this.kafkaClusterConfig);
                        if (fetchResponse.errorCode(topic, this.partitionId) != 0) {
                            KafkaStreamingInput.logger.warn("fetch response offset:" + findClosestOffsetWithDataTimestamp + " errorCode:" + ((int) fetchResponse.errorCode(topic, this.partitionId)));
                            Thread.sleep(30000L);
                        } else {
                            Iterator it2 = fetchResponse.messageSet(topic, this.partitionId).iterator();
                            while (true) {
                                if (!it2.hasNext()) {
                                    break;
                                }
                                findClosestOffsetWithDataTimestamp++;
                                i2++;
                                StreamingMessage parse = this.streamingParser.parse((MessageAndOffset) it2.next());
                                if (this.streamingParser.filter(parse)) {
                                    long timestamp = parse.getTimestamp();
                                    if (timestamp >= this.timeRange.getFirst().longValue() && timestamp < this.timeRange.getSecond().longValue()) {
                                        newLinkedList.add(parse);
                                    } else if (timestamp >= this.timeRange.getSecond().longValue() + this.margin) {
                                        KafkaStreamingInput.logger.info("thread:" + Thread.currentThread() + " message timestamp:" + timestamp + " is out of time range:" + this.timeRange + " margin:" + this.margin);
                                        z = true;
                                        break;
                                    }
                                }
                            }
                            KafkaStreamingInput.logger.info("Number of messages consumed: " + i2 + " offset is: " + findClosestOffsetWithDataTimestamp + " total fetch round: " + i);
                            if (z) {
                                break;
                            }
                            if (i2 == i3) {
                                KafkaStreamingInput.logger.info("no message consumed this round, wait 30s");
                                Thread.sleep(30000L);
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                KafkaStreamingInput.logger.warn("this thread should not be interrupted, just stop fetching", (Throwable) e);
            }
            return newLinkedList;
        }
    }

    @Override // org.apache.kylin.engine.streaming.IStreamingInput
    public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String str, int i, long j, long j2) {
        if (realizationType != RealizationType.CUBE) {
            throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
        }
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        String factTable = CubeManager.getInstance(instanceFromEnv).getCube(str).getFactTable();
        StreamingConfig config = StreamingManager.getInstance(instanceFromEnv).getConfig(factTable);
        if (config == null) {
            throw new IllegalArgumentException("Table " + factTable + " is not a streaming table.");
        }
        if (!StreamingConfig.STREAMING_TYPE_KAFKA.equals(config.getType())) {
            throw new IllegalArgumentException("kafka is the only supported streaming type.");
        }
        logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", factTable, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2)));
        try {
            KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(instanceFromEnv).getKafkaConfig(factTable);
            StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, str);
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            ArrayList newArrayList = Lists.newArrayList();
            for (KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
                int size = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
                for (int i2 = 0; i2 < size; i2++) {
                    newArrayList.add(newCachedThreadPool.submit(new StreamingMessageProducer(kafkaClusterConfig, i2, Pair.newPair(Long.valueOf(j), Long.valueOf(j2)), kafkaConfig.getMargin(), streamingParser)));
                }
            }
            LinkedList newLinkedList = Lists.newLinkedList();
            Iterator it2 = newArrayList.iterator();
            while (it2.hasNext()) {
                try {
                    try {
                        newLinkedList.addAll((Collection) ((Future) it2.next()).get());
                    } catch (InterruptedException e) {
                        logger.warn("this thread should not be interrupted, just ignore", (Throwable) e);
                    }
                } catch (ExecutionException e2) {
                    throw new RuntimeException("error when get StreamingMessages", e2.getCause());
                }
            }
            Pair newPair = Pair.newPair(Long.valueOf(j), Long.valueOf(j2));
            logger.info("finish to get streaming batch, total message count:" + newLinkedList.size());
            return new StreamingBatch(newLinkedList, newPair);
        } catch (ReflectiveOperationException e3) {
            throw new RuntimeException("failed to create instance of StreamingParser", e3);
        }
    }
}
