package org.apache.skywalking.apm.agent.core.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.admin.AdminClient;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.KafkaFuture;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.Serializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.serialization.StringSerializer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;

@DefaultImplementor
/* loaded from: input_file:org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.class */
public class KafkaProducerManager implements BootService, Runnable {
    private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class);
    private Set<String> topics = new HashSet();
    private List<KafkaConnectionStatusListener> listeners = new ArrayList();
    private volatile KafkaProducer<String, Bytes> producer;
    private ScheduledFuture<?> bootProducerFuture;

    public void prepare() {
    }

    public void boot() {
        this.bootProducerFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("org.apache.skywalking.apm.dependencies.kafkaProducerInitThread")).scheduleAtFixedRate(new RunnableWithExceptionProtection(this, th -> {
            LOGGER.error("unexpected exception.", th);
        }), 0L, 120L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String formatTopicNameThenRegister(String str) {
        String str2 = StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? str : KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + str;
        this.topics.add(str2);
        return str2;
    }

    public void addListener(KafkaConnectionStatusListener kafkaConnectionStatusListener) {
        if (this.listeners.contains(kafkaConnectionStatusListener)) {
            return;
        }
        this.listeners.add(kafkaConnectionStatusListener);
    }

    public void onComplete() {
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setContextClassLoader(AgentClassLoader.getDefault());
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
        Map<String, String> map = KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG;
        properties.getClass();
        map.forEach(properties::setProperty);
        AdminClient create = AdminClient.create(properties);
        Throwable th = null;
        try {
            Set set = (Set) create.describeTopics(this.topics).values().entrySet().stream().map(entry -> {
                try {
                    ((KafkaFuture) entry.getValue()).get(KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT, TimeUnit.SECONDS);
                    return null;
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOGGER.error(e, "Get KAFKA topic:{} error.", new Object[]{entry.getKey()});
                    return (String) entry.getKey();
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toSet());
            if (!set.isEmpty()) {
                LOGGER.warn("org.apache.skywalking.apm.dependencies.kafka topics {} is not exist, connect to kafka cluster abort", new Object[]{set});
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            try {
                this.producer = new KafkaProducer<>(properties, (Serializer) new StringSerializer(), (Serializer) new BytesSerializer());
                notifyListeners(KafkaConnectionStatus.CONNECTED);
                this.bootProducerFuture.cancel(true);
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Exception e) {
                LOGGER.error(e, "connect to kafka cluster '{}' failed", new Object[]{KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS});
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            }
        } catch (Throwable th5) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    create.close();
                }
            }
            throw th5;
        }
    }

    private void notifyListeners(KafkaConnectionStatus kafkaConnectionStatus) {
        Iterator<KafkaConnectionStatusListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onStatusChanged(kafkaConnectionStatus);
        }
    }

    public final KafkaProducer<String, Bytes> getProducer() {
        return this.producer;
    }

    public int priority() {
        return ServiceManager.INSTANCE.findService(GRPCChannelManager.class).priority() - 1;
    }

    public void shutdown() {
        this.producer.flush();
        this.producer.close();
    }
}
