package org.apache.skywalking.oap.server.exporter.provider.kafka.log;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.apm.network.logging.v3.TextLog;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.apm.network.logging.v3.YAMLLog;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.exporter.LogExportService;
import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.exporter.provider.kafka.KafkaExportProducer;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.class */
public class KafkaLogExporter extends KafkaExportProducer implements LogExportService, IConsumer<LogRecord> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaLogExporter.class);
    private DataCarrier<LogRecord> exportBuffer;
    private CounterMetrics successCounter;
    private CounterMetrics errorCounter;
    private final ModuleManager moduleManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.skywalking.oap.server.exporter.provider.kafka.log.KafkaLogExporter$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType = new int[ContentType.values().length];

        static {
            try {
                $SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType[ContentType.JSON.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType[ContentType.YAML.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType[ContentType.TEXT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType[ContentType.NONE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public KafkaLogExporter(ModuleManager moduleManager, ExporterSetting exporterSetting) {
        super(exporterSetting);
        this.moduleManager = moduleManager;
    }

    public void start() {
        super.getProducer();
        this.exportBuffer = new DataCarrier<>("KafkaLogExporter", "KafkaLogExporter", this.setting.getBufferChannelNum(), this.setting.getBufferChannelSize(), BufferStrategy.IF_POSSIBLE);
        this.exportBuffer.consume(this, 1, 200L);
        MetricsCreator service = this.moduleManager.find("telemetry").provider().getService(MetricsCreator.class);
        this.successCounter = service.createCounter("kafka_exporter_log_success_count", "The success number of log exported by kafka exporter.", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"kafka"}));
        this.errorCounter = service.createCounter("kafka_exporter_log_error_count", "The error number of log exported by kafka exporter", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"kafka"}));
    }

    public void export(LogRecord logRecord) {
        if (logRecord != null) {
            this.exportBuffer.produce(logRecord);
        }
    }

    public boolean isEnabled() {
        return this.setting.isEnableKafkaLog();
    }

    public void consume(List<LogRecord> list) {
        for (LogRecord logRecord : list) {
            if (logRecord != null) {
                try {
                    super.getProducer().send(new ProducerRecord(this.setting.getKafkaTopicLog(), logRecord.id().build(), Bytes.wrap(transLogData(logRecord).toByteArray())), (recordMetadata, exc) -> {
                        if (exc == null) {
                            this.successCounter.inc();
                        } else {
                            this.errorCounter.inc();
                            log.error("Failed to export Log.", exc);
                        }
                    });
                } catch (InvalidProtocolBufferException e) {
                    throw new UnexpectedException("Failed to parse Log tags from LogRecord, id: " + logRecord.id() + ".", e);
                }
            }
        }
    }

    public void onError(List<LogRecord> list, Throwable th) {
    }

    private LogData transLogData(LogRecord logRecord) throws InvalidProtocolBufferException {
        LogData.Builder newBuilder = LogData.newBuilder();
        LogDataBody.Builder newBuilder2 = LogDataBody.newBuilder();
        switch (AnonymousClass1.$SwitchMap$org$apache$skywalking$oap$server$core$query$type$ContentType[ContentType.instanceOf(logRecord.getContentType()).ordinal()]) {
            case 1:
                newBuilder2.setType(ContentType.JSON.name());
                newBuilder2.setJson(JSONLog.newBuilder().setJson(logRecord.getContent().getText()));
                break;
            case 2:
                newBuilder2.setType(ContentType.YAML.name());
                newBuilder2.setYaml(YAMLLog.newBuilder().setYaml(logRecord.getContent().getText()));
                break;
            case ExportMetricValue.ENTITYID_FIELD_NUMBER /* 3 */:
                newBuilder2.setType(ContentType.TEXT.name());
                newBuilder2.setText(TextLog.newBuilder().setText(logRecord.getContent().getText()));
                break;
            case ExportMetricValue.TIMEBUCKET_FIELD_NUMBER /* 4 */:
                newBuilder2.setType(ContentType.NONE.name());
                break;
            default:
                throw new UnexpectedException("Failed to parse Log ContentType value: " + logRecord.getContentType() + " from LogRecord, id: " + logRecord.id() + ".");
        }
        newBuilder.setBody(newBuilder2);
        newBuilder.setTimestamp(logRecord.getTimestamp());
        newBuilder.setService(IDManager.ServiceID.analysisId(logRecord.getServiceId()).getName());
        if (StringUtil.isNotEmpty(logRecord.getServiceInstanceId())) {
            newBuilder.setServiceInstance(IDManager.ServiceInstanceID.analysisId(logRecord.getServiceInstanceId()).getName());
        }
        if (StringUtil.isNotEmpty(logRecord.getEndpointId())) {
            newBuilder.setEndpoint(IDManager.EndpointID.analysisId(logRecord.getEndpointId()).getEndpointName());
        }
        TraceContext.Builder newBuilder3 = TraceContext.newBuilder();
        if (StringUtil.isNotEmpty(logRecord.getTraceSegmentId())) {
            newBuilder3.setTraceSegmentId(logRecord.getTraceSegmentId());
            newBuilder3.setSpanId(logRecord.getSpanId());
        }
        if (StringUtil.isNotEmpty(logRecord.getTraceId())) {
            newBuilder3.setTraceId(logRecord.getTraceId());
        }
        newBuilder.setTraceContext(newBuilder3);
        if (logRecord.getTagsRawData() != null) {
            newBuilder.setTags(LogTags.parseFrom(logRecord.getTagsRawData()));
        }
        return newBuilder.build();
    }
}
