package org.apache.kylin.sample;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.kafka.util.KafkaUtils;
import org.apache.kylin.loader.ParserClassLoaderState;
import org.apache.kylin.metadata.jar.JarInfo;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.parser.AbstractDataParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/sample/KafkaSourceHandler.class */
public class KafkaSourceHandler implements StreamingSourceHandler {
    private static final String DEFAULT_CONSUMER_GROUP = "sample";
    private static final String DEFAULT_TOPIC = "__consumer_offsets";
    public static final String DEFAULT_PARSER = "org.apache.kylin.parser.TimedJsonStreamParser";
    private static final String COL_PATTERN = "^(?!\\d+|_)([0-9a-zA-Z_]{1,}$)";
    private static final String UUID_PATTERN = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
    private static final int SAMPLE_MSG_COUNT = 10;
    private static final int CLIENT_LIST_TOPICS_TIMEOUT = 5000;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceHandler.class);
    private static final Long CONSUMER_LIST_TOPICS_TIMEOUT = 30000L;

    @Override // org.apache.kylin.sample.StreamingSourceHandler
    public Map<String, List<String>> getTopics(KafkaConfig kafkaConfig, String str) {
        int i = 0 + 1;
        ArrayList newArrayList = Lists.newArrayList();
        Consumer<String, ByteBuffer> kafkaConsumer = KafkaUtils.getKafkaConsumer(kafkaConfig.getKafkaBootstrapServers(), DEFAULT_CONSUMER_GROUP);
        Throwable th = null;
        try {
            HashMap newHashMap = Maps.newHashMap();
            try {
                newHashMap.putAll(kafkaConsumer.listTopics(Duration.ofMillis(CONSUMER_LIST_TOPICS_TIMEOUT.longValue())));
                boolean isEmpty = StringUtils.isEmpty(str);
                Stream filter = newHashMap.keySet().stream().filter(this::isUsefulTopic).filter(str2 -> {
                    return isEmpty || StringUtils.containsIgnoreCase(str2.toLowerCase(Locale.ROOT), str);
                });
                newArrayList.getClass();
                filter.forEach((v1) -> {
                    r1.add(v1);
                });
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                Collections.sort(newArrayList);
                TreeMap newTreeMap = Maps.newTreeMap();
                newTreeMap.put("kafka-cluster-" + i, newArrayList);
                return newTreeMap;
            } catch (TimeoutException e) {
                throw new KylinException(ServerErrorCode.BROKER_TIMEOUT_MESSAGE, MsgPicker.getMsg().getBrokerTimeoutMessage());
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.kylin.sample.StreamingSourceHandler
    public List<String> getBrokenBrokers(KafkaConfig kafkaConfig) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap = new HashMap();
        Arrays.stream(kafkaConfig.getKafkaBootstrapServers().split(",")).forEach(str -> {
            AdminClient kafkaAdminClient = KafkaUtils.getKafkaAdminClient(str, DEFAULT_CONSUMER_GROUP);
            hashMap.put(str, kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(5000)));
            arrayList2.add(kafkaAdminClient);
        });
        hashMap.forEach((str2, listTopicsResult) -> {
            try {
                listTopicsResult.names().get();
            } catch (InterruptedException e) {
                log.error("The current thread is interrupted", e);
                Thread.currentThread().interrupt();
            } catch (ExecutionException | TimeoutException e2) {
                arrayList.add(str2);
                log.warn("Broker [{}] cannot be connected, marked as failed", str2);
            }
        });
        arrayList2.forEach((v0) -> {
            v0.close();
        });
        return arrayList;
    }

    private boolean isUsefulTopic(String str) {
        return (Pattern.compile(UUID_PATTERN).matcher(str).matches() || StringUtils.equals(str, DEFAULT_TOPIC)) ? false : true;
    }

    @Override // org.apache.kylin.sample.StreamingSourceHandler
    public List<ByteBuffer> getMessages(KafkaConfig kafkaConfig) {
        log.info("Start to get sample messages from Kafka.");
        long kafkaPollMessageTimeout = KylinConfig.getInstanceFromEnv().getKafkaPollMessageTimeout();
        String subscribe = kafkaConfig.getSubscribe();
        String kafkaBootstrapServers = kafkaConfig.getKafkaBootstrapServers();
        long currentTimeMillis = System.currentTimeMillis();
        log.info("Trying to get messages from brokers: {}", kafkaBootstrapServers);
        ArrayList newArrayList = Lists.newArrayList();
        Consumer<String, ByteBuffer> kafkaConsumer = KafkaUtils.getKafkaConsumer(kafkaBootstrapServers, DEFAULT_CONSUMER_GROUP);
        Throwable th = null;
        try {
            List partitionsFor = kafkaConsumer.partitionsFor(subscribe);
            if (CollectionUtils.isEmpty(partitionsFor)) {
                log.warn("There are no partitions in topic: {}", subscribe);
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return newArrayList;
            }
            List<TopicPartition> list = (List) partitionsFor.stream().map(partitionInfo -> {
                return new TopicPartition(subscribe, partitionInfo.partition());
            }).collect(Collectors.toList());
            Map beginningOffsets = kafkaConsumer.beginningOffsets(list);
            for (TopicPartition topicPartition : list) {
                if (newArrayList.size() >= 10) {
                    break;
                }
                pollMsg(topicPartition, (Long) beginningOffsets.get(topicPartition), kafkaConsumer, subscribe, Long.valueOf(kafkaPollMessageTimeout), newArrayList);
            }
            log.info("Get sample message size is: {}, cost: {}ms", Integer.valueOf(newArrayList.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return newArrayList;
        } finally {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void pollMsg(TopicPartition topicPartition, Long l, Consumer<String, ByteBuffer> consumer, String str, Long l2, List<ByteBuffer> list) {
        List singletonList = Collections.singletonList(topicPartition);
        consumer.assign(singletonList);
        consumer.seekToEnd(singletonList);
        long position = consumer.position(topicPartition);
        long longValue = position - l.longValue();
        if (longValue <= 0) {
            return;
        }
        consumer.seek(topicPartition, longValue < 10 ? l.longValue() : position - 10);
        log.info("Ready to poll messages. Topic: {}, Partition: {}, Partition beginning offset: {}, Offset: {}", new Object[]{str, Integer.valueOf(topicPartition.partition()), l, Long.valueOf(position)});
        ConsumerRecords poll = consumer.poll(Duration.ofMillis(l2.longValue()));
        if (poll.isEmpty()) {
            return;
        }
        Iterator it2 = poll.iterator();
        while (it2.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
            if (list.size() >= 10) {
                return;
            } else {
                list.add(consumerRecord.value());
            }
        }
    }

    @Override // org.apache.kylin.sample.StreamingSourceHandler
    public Map<String, Object> parserMessage(KafkaConfig kafkaConfig, String str) {
        String parserName = kafkaConfig.getParserName();
        String project = kafkaConfig.getProject();
        String subscribe = kafkaConfig.getSubscribe();
        try {
            ParserClassLoaderState parserClassLoaderState = ParserClassLoaderState.getInstance(project);
            checkParserRegister(parserName, project, parserClassLoaderState);
            Map<String, Object> process = AbstractDataParser.getDataParser(parserName, parserClassLoaderState.getClassLoader()).process(StandardCharsets.UTF_8.encode(str));
            checkColName(process);
            return process;
        } catch (Exception e) {
            throw new KylinException(ErrorCodeServer.STREAMING_PARSE_MESSAGE_ERROR, e, parserName, subscribe);
        }
    }

    public void checkParserRegister(String str, String str2, ParserClassLoaderState parserClassLoaderState) {
        if (StringUtils.equals("org.apache.kylin.parser.TimedJsonStreamParser", str)) {
            return;
        }
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        JarInfo jarInfo = JarInfoManager.getInstance(instanceFromEnv, str2).getJarInfo(JarTypeEnum.STREAMING_CUSTOM_PARSER, DataParserManager.getInstance(instanceFromEnv, str2).getDataParserInfo(str).getJarName());
        if (parserClassLoaderState.getLoadedJars().contains(jarInfo.getJarPath())) {
            return;
        }
        parserClassLoaderState.registerJars(Sets.newHashSet(jarInfo.getJarPath()));
    }

    private static void checkColName(Map<String, Object> map) {
        Pattern compile = Pattern.compile(COL_PATTERN);
        Iterator<String> it2 = map.keySet().iterator();
        while (it2.hasNext()) {
            if (!compile.matcher(it2.next()).matches()) {
                throw new KylinException(ErrorCodeServer.CUSTOM_PARSER_CHECK_COLUMN_NAME_FAILED, new Object[0]);
            }
        }
    }
}
