package org.apache.skywalking.oap.server.analyzer.agent.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/analyzer/agent/kafka/KafkaFetcherHandlerRegister.class */
public class KafkaFetcherHandlerRegister {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaFetcherHandlerRegister.class);
    private ImmutableMap<String, KafkaHandler> handlerMap;
    private final KafkaFetcherConfig config;
    private final ThreadPoolExecutor executor;
    private final boolean enableKafkaMessageAutoCommit;
    private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
    private final List<KafkaConsumer<String, Bytes>> consumers = Lists.newArrayList();
    private final Properties properties = new Properties();

    public KafkaFetcherHandlerRegister(KafkaFetcherConfig kafkaFetcherConfig) {
        this.config = kafkaFetcherConfig;
        this.properties.setProperty("group.id", kafkaFetcherConfig.getGroupId());
        this.properties.setProperty("bootstrap.servers", kafkaFetcherConfig.getBootstrapServers());
        this.properties.putAll(kafkaFetcherConfig.getKafkaConsumerConfig());
        int kafkaHandlerThreadPoolSize = kafkaFetcherConfig.getKafkaHandlerThreadPoolSize() > 0 ? kafkaFetcherConfig.getKafkaHandlerThreadPoolSize() : Runtime.getRuntime().availableProcessors() * 2;
        int kafkaHandlerThreadPoolQueueSize = kafkaFetcherConfig.getKafkaHandlerThreadPoolQueueSize() > 0 ? kafkaFetcherConfig.getKafkaHandlerThreadPoolQueueSize() : 10000;
        this.enableKafkaMessageAutoCommit = ((Boolean) this.properties.getOrDefault("enable.auto.commit", true)).booleanValue();
        for (int i = 0; i < kafkaFetcherConfig.getConsumers(); i++) {
            this.consumers.add(new KafkaConsumer<>(this.properties, new StringDeserializer(), new BytesDeserializer()));
        }
        this.executor = new ThreadPoolExecutor(kafkaHandlerThreadPoolSize, kafkaHandlerThreadPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(kafkaHandlerThreadPoolQueueSize), new CustomThreadFactory("KafkaConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void register(KafkaHandler kafkaHandler) {
        this.builder.put(kafkaHandler.getTopic(), kafkaHandler);
    }

    public void start() throws ModuleStartException {
        this.handlerMap = this.builder.build();
        this.builder = null;
        createTopicIfNeeded(this.handlerMap.keySet(), this.properties);
        for (KafkaConsumer<String, Bytes> kafkaConsumer : this.consumers) {
            kafkaConsumer.subscribe(this.handlerMap.keySet());
            kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
            this.executor.submit(() -> {
                runTask(kafkaConsumer);
            });
        }
    }

    private void runTask(KafkaConsumer<String, Bytes> kafkaConsumer) {
        while (true) {
            try {
                ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(500L));
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        this.executor.submit(() -> {
                            ((KafkaHandler) Objects.requireNonNull(this.handlerMap.get(consumerRecord.topic()))).handle(consumerRecord);
                        });
                    }
                    if (!this.enableKafkaMessageAutoCommit) {
                        kafkaConsumer.commitAsync();
                    }
                }
            } catch (Exception e) {
                log.error("Kafka handle message error.", e);
            }
        }
    }

    private void createTopicIfNeeded(Collection<String> collection, Properties properties) throws ModuleStartException {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.remove("group.id");
        AdminClient create = AdminClient.create(properties2);
        Set set = (Set) create.describeTopics(collection).values().entrySet().stream().map(entry -> {
            try {
                ((KafkaFuture) entry.getValue()).get();
                return null;
            } catch (InterruptedException | ExecutionException e) {
                return (String) entry.getKey();
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            return;
        }
        log.info("Topics " + set + " not exist.");
        try {
            create.createTopics((List) set.stream().map(str -> {
                return new NewTopic(str, this.config.getPartitions(), (short) this.config.getReplicationFactor());
            }).collect(Collectors.toList())).all().get();
        } catch (Exception e) {
            throw new ModuleStartException("Failed to create Kafka Topics" + set + ".", e);
        }
    }
}
