package org.apache.skywalking.apm.agent.core.kafka;

import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.dependencies.org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;

@OverrideImplementor(TraceSegmentServiceClient.class)
/* loaded from: input_file:org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.class */
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
    private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
    private String topic;
    private KafkaProducer<String, Bytes> producer;
    private volatile DataCarrier<TraceSegment> carrier;

    public void prepare() {
        KafkaProducerManager kafkaProducerManager = (KafkaProducerManager) ServiceManager.INSTANCE.findService(KafkaProducerManager.class);
        kafkaProducerManager.addListener(this);
        this.topic = kafkaProducerManager.formatTopicNameThenRegister(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT);
    }

    public void boot() {
        this.carrier = new DataCarrier<>(Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
        this.carrier.consume(this, 1);
    }

    public void onComplete() {
        TracingContext.ListenerManager.add(this);
    }

    public void shutdown() {
        TracingContext.ListenerManager.remove(this);
        this.carrier.shutdownConsumers();
    }

    public void init() {
    }

    public void consume(List<TraceSegment> list) {
        if (this.producer == null) {
            return;
        }
        list.forEach(traceSegment -> {
            SegmentObject transform = traceSegment.transform();
            this.producer.send(new ProducerRecord<>(this.topic, transform.getTraceSegmentId(), Bytes.wrap(transform.toByteArray())), (recordMetadata, exc) -> {
                if (Objects.nonNull(exc)) {
                    LOGGER.error("Failed to report TraceSegment.", exc);
                }
            });
        });
    }

    public void onError(List<TraceSegment> list, Throwable th) {
        LOGGER.error(th, "Try to send {} trace segments to collector, with unexpected exception.", new Object[]{Integer.valueOf(list.size())});
    }

    public void onExit() {
    }

    public void afterFinished(TraceSegment traceSegment) {
        if (LOGGER.isDebugEnable()) {
            LOGGER.debug("Trace segment reporting, traceId: {}", new Object[]{traceSegment.getTraceSegmentId()});
        }
        if (traceSegment.isIgnore()) {
            LOGGER.debug("Trace[TraceId={}] is ignored.", new Object[]{traceSegment.getTraceSegmentId()});
        } else {
            this.carrier.produce(traceSegment);
        }
    }

    @Override // org.apache.skywalking.apm.agent.core.kafka.KafkaConnectionStatusListener
    public void onStatusChanged(KafkaConnectionStatus kafkaConnectionStatus) {
        if (kafkaConnectionStatus == KafkaConnectionStatus.CONNECTED) {
            this.producer = ((KafkaProducerManager) ServiceManager.INSTANCE.findService(KafkaProducerManager.class)).getProducer();
        }
    }
}
