package org.apache.skywalking.apm.agent.core.kafka;

import java.util.List;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig;
import org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.logging.v3.LogData;

@OverrideImplementor(LogReportServiceClient.class)
/* loaded from: input_file:org/apache/skywalking/apm/agent/core/kafka/KafkaLogReporterServiceClient.class */
public class KafkaLogReporterServiceClient extends LogReportServiceClient implements KafkaConnectionStatusListener {
    private String topic;
    private KafkaProducer<String, Bytes> producer;

    public void prepare() {
        KafkaProducerManager kafkaProducerManager = (KafkaProducerManager) ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
        kafkaProducerManager.addListener(this);
        this.topic = kafkaProducerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_LOGGING);
    }

    public void produce(LogData.Builder builder) {
        super.produce(builder);
    }

    public void consume(List<LogData.Builder> list) {
        if (this.producer == null || CollectionUtil.isEmpty(list)) {
            return;
        }
        for (LogData.Builder builder : list) {
            builder.setService(Config.Agent.SERVICE_NAME);
            this.producer.send(new ProducerRecord<>(this.topic, builder.getService(), Bytes.wrap(builder.build().toByteArray())));
        }
    }

    @Override // org.apache.skywalking.apm.agent.core.kafka.KafkaConnectionStatusListener
    public void onStatusChanged(KafkaConnectionStatus kafkaConnectionStatus) {
        if (kafkaConnectionStatus == KafkaConnectionStatus.CONNECTED) {
            this.producer = ((KafkaProducerManager) ServiceManager.INSTANCE.findService(KafkaProducerManager.class)).getProducer();
        }
    }
}
