package org.apache.airavata.common.logging.kafka;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.StackTraceElementProxy;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.google.gson.Gson;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import org.apache.airavata.common.logging.Exception;
import org.apache.airavata.common.logging.LogEntry;
import org.apache.airavata.common.logging.ServerId;
import org.apache.airavata.common.utils.AwsMetadata;
import org.apache.airavata.common.utils.BuildConstant;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/common/logging/kafka/KafkaAppender.class */
public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaAppender.class);
    private final Producer<String, String> producer;
    private final String kafkaTopic;
    private ServerId serverId;

    public KafkaAppender(String str, String str2) {
        this.serverId = null;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ProducerConfig.ACKS_CONFIG, "0");
        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 10000);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        properties.put("producer.type", "async");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaTopic = createKafkaTopic(str2);
        logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", str, this.kafkaTopic);
        this.producer = new KafkaProducer(properties);
        if (!ServerSettings.isRunningOnAws()) {
            this.serverId = new ServerId(ServerSettings.getIp(), ServerSettings.getIp(), BuildConstant.VERSION, ServerSettings.getServerRoles());
        } else {
            AwsMetadata awsMetadata = new AwsMetadata();
            this.serverId = new ServerId(awsMetadata.getId(), awsMetadata.getHostname(), BuildConstant.VERSION, ServerSettings.getServerRoles());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // ch.qos.logback.core.UnsynchronizedAppenderBase
    public void append(ILoggingEvent iLoggingEvent) {
        LogEntry logEntry;
        iLoggingEvent.prepareForDeferredProcessing();
        if (iLoggingEvent.getLevel().equals(Level.ALL) || iLoggingEvent.getLevel().equals(Level.OFF)) {
            return;
        }
        IThrowableProxy throwableProxy = iLoggingEvent.getThrowableProxy();
        if (throwableProxy != null) {
            logEntry = new LogEntry(this.serverId, iLoggingEvent.getFormattedMessage(), Instant.ofEpochMilli(iLoggingEvent.getTimeStamp()).toString(), iLoggingEvent.getLevel().toString(), iLoggingEvent.getLoggerName(), iLoggingEvent.getMDCPropertyMap(), iLoggingEvent.getThreadName() != null ? iLoggingEvent.getThreadName() : null, new Exception(throwableProxy.getMessage(), toStringArray(throwableProxy.getStackTraceElementProxyArray()), throwableProxy.getClassName()));
        } else {
            logEntry = new LogEntry(this.serverId, iLoggingEvent.getFormattedMessage(), Instant.ofEpochMilli(iLoggingEvent.getTimeStamp()).toString(), iLoggingEvent.getLevel().toString(), iLoggingEvent.getLoggerName(), iLoggingEvent.getMDCPropertyMap(), iLoggingEvent.getThreadName() != null ? iLoggingEvent.getThreadName() : null);
        }
        this.producer.send(new ProducerRecord<>(this.kafkaTopic, new Gson().toJson(logEntry)));
    }

    private String[] toStringArray(StackTraceElementProxy[] stackTraceElementProxyArr) {
        return (String[]) Arrays.stream(stackTraceElementProxyArr).map((v0) -> {
            return v0.getSTEAsString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    private String createKafkaTopic(String str) {
        return ServerSettings.getServerRoles().length >= 4 ? String.format("%s_all_logs", str) : String.format("%s_%s_logs", str, ServerSettings.getServerRoles()[0]);
    }

    public void close() {
        this.producer.close();
    }
}
