package org.apache.skywalking.oap.server.analyzer.agent.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.class */
public class KafkaFetcherHandlerRegister implements Runnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaFetcherHandlerRegister.class);
    private ImmutableMap<String, KafkaHandler> handlerMap;
    private KafkaConsumer<String, Bytes> consumer;
    private final KafkaFetcherConfig config;
    private final boolean isSharding;
    private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
    private List<TopicPartition> topicPartitions = Lists.newArrayList();

    public KafkaFetcherHandlerRegister(KafkaFetcherConfig kafkaFetcherConfig) throws ModuleStartException {
        this.consumer = null;
        this.config = kafkaFetcherConfig;
        Properties properties = new Properties();
        properties.putAll(kafkaFetcherConfig.getKafkaConsumerConfig());
        properties.setProperty("group.id", kafkaFetcherConfig.getGroupId());
        properties.setProperty("bootstrap.servers", kafkaFetcherConfig.getBootstrapServers());
        AdminClient create = AdminClient.create(properties);
        Set set = (Set) create.describeTopics(Lists.newArrayList(new String[]{kafkaFetcherConfig.getTopicNameOfManagements(), kafkaFetcherConfig.getTopicNameOfMetrics(), kafkaFetcherConfig.getTopicNameOfProfiling(), kafkaFetcherConfig.getTopicNameOfTracingSegments(), kafkaFetcherConfig.getTopicNameOfMeters()})).values().entrySet().stream().map(entry -> {
            try {
                ((KafkaFuture) entry.getValue()).get();
                return null;
            } catch (InterruptedException | ExecutionException e) {
                return (String) entry.getKey();
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            log.info("Topics" + set.toString() + " not exist.");
            try {
                create.createTopics((List) set.stream().map(str -> {
                    return new NewTopic(str, kafkaFetcherConfig.getPartitions(), (short) kafkaFetcherConfig.getReplicationFactor());
                }).collect(Collectors.toList())).all().get();
            } catch (Exception e) {
                throw new ModuleStartException("Failed to create Kafka Topics" + set + ".", e);
            }
        }
        if (kafkaFetcherConfig.isSharding() && StringUtil.isNotEmpty(kafkaFetcherConfig.getConsumePartitions())) {
            this.isSharding = true;
        } else {
            this.isSharding = false;
        }
        this.consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
    }

    public void register(KafkaHandler kafkaHandler) {
        this.builder.put(kafkaHandler.getTopic(), kafkaHandler);
        this.topicPartitions.addAll(kafkaHandler.getTopicPartitions());
    }

    public void start() {
        this.handlerMap = this.builder.build();
        if (this.isSharding) {
            this.consumer.assign(this.topicPartitions);
        } else {
            this.consumer.subscribe(this.handlerMap.keySet());
        }
        this.consumer.seekToEnd(this.consumer.assignment());
        Executors.newSingleThreadExecutor(new DefaultThreadFactory("KafkaConsumer")).submit(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(500L));
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, Bytes> consumerRecord = (ConsumerRecord) it.next();
                    ((KafkaHandler) this.handlerMap.get(consumerRecord.topic())).handle(consumerRecord);
                }
                this.consumer.commitAsync();
            }
        }
    }
}
