package org.apache.skywalking.oap.server.exporter.provider.kafka.trace;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Iterator;
import java.util.List;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.exporter.TraceExportService;
import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.class */
public class KafkaTraceExporter extends KafkaExportProducer implements TraceExportService, IConsumer<SegmentRecord> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaTraceExporter.class);
    private DataCarrier<SegmentRecord> exportBuffer;
    private CounterMetrics successCounter;
    private CounterMetrics errorCounter;
    private final ModuleManager moduleManager;

    public KafkaTraceExporter(ModuleManager moduleManager, ExporterSetting exporterSetting) {
        super(exporterSetting);
        this.moduleManager = moduleManager;
    }

    public void start() {
        super.getProducer();
        this.exportBuffer = new DataCarrier<>("KafkaTraceExporter", "KafkaTraceExporter", this.setting.getBufferChannelNum(), this.setting.getBufferChannelSize(), BufferStrategy.IF_POSSIBLE);
        this.exportBuffer.consume(this, 1, 200L);
        MetricsCreator service = this.moduleManager.find("telemetry").provider().getService(MetricsCreator.class);
        this.successCounter = service.createCounter("kafka_exporter_trace_success_count", "The success number of traces exported by kafka exporter.", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"kafka"}));
        this.errorCounter = service.createCounter("kafka_exporter_trace_error_count", "The error number of traces exported by kafka exporter", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"kafka"}));
    }

    public void export(SegmentRecord segmentRecord) {
        if (segmentRecord != null) {
            this.exportBuffer.produce(segmentRecord);
        }
    }

    public boolean isEnabled() {
        return this.setting.isEnableKafkaTrace();
    }

    public void consume(List<SegmentRecord> list) {
        for (SegmentRecord segmentRecord : list) {
            if (segmentRecord != null) {
                try {
                    SegmentObject parseFrom = SegmentObject.parseFrom(segmentRecord.getDataBinary());
                    if (!this.setting.isExportErrorStatusTraceOnly() || isError(parseFrom)) {
                        super.getProducer().send(new ProducerRecord(this.setting.getKafkaTopicTrace(), parseFrom.getTraceSegmentId(), Bytes.wrap(parseFrom.toByteArray())), (recordMetadata, exc) -> {
                            if (exc == null) {
                                this.successCounter.inc();
                            } else {
                                this.errorCounter.inc();
                                log.error("Failed to export Trace.", exc);
                            }
                        });
                    }
                } catch (InvalidProtocolBufferException e) {
                    throw new UnexpectedException("Failed to parse SegmentObject from SegmentRecord, id: " + segmentRecord.getSegmentId() + ".", e);
                }
            }
        }
    }

    private boolean isError(SegmentObject segmentObject) {
        Iterator it = segmentObject.getSpansList().iterator();
        while (it.hasNext()) {
            if (((SpanObject) it.next()).getIsError()) {
                return true;
            }
        }
        return false;
    }

    public void onError(List<SegmentRecord> list, Throwable th) {
    }
}
