package co.cask.cdap.logging.appender.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.logging.appender.AbstractLogPublisher;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import com.google.inject.Inject;
import java.util.List;
import kafka.producer.KeyedMessage;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender.class */
public final class KafkaLogAppender extends LogAppender {
    private static final int QUEUE_SIZE = 512;
    private static final String APPENDER_NAME = "KafkaLogAppender";
    private final KafkaLogPublisher kafkaLogPublisher;

    /* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender$KafkaLogPublisher.class */
    private final class KafkaLogPublisher extends AbstractLogPublisher<KeyedMessage<String, byte[]>> {
        private final CConfiguration cConf;
        private final String topic;
        private final LoggingEventSerializer loggingEventSerializer;
        private final LogPartitionType logPartitionType;
        private SimpleKafkaProducer producer;

        private KafkaLogPublisher(CConfiguration cConfiguration) {
            super(KafkaLogAppender.QUEUE_SIZE, RetryStrategies.fromConfiguration(cConfiguration, "system.log.process."));
            this.cConf = cConfiguration;
            this.topic = cConfiguration.get("log.kafka.topic");
            this.loggingEventSerializer = new LoggingEventSerializer();
            this.logPartitionType = LogPartitionType.valueOf(cConfiguration.get("log.publish.partition.key").toUpperCase());
        }

        protected void doStartUp() throws Exception {
            this.producer = new SimpleKafkaProducer(this.cConf);
            super.doStartUp();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        public void doShutdown() throws Exception {
            super.doShutdown();
            this.producer.stop();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        public KeyedMessage<String, byte[]> createMessage(LogMessage logMessage) {
            return new KeyedMessage<>(this.topic, this.logPartitionType.getPartitionKey(logMessage.getLoggingContext()), this.loggingEventSerializer.toBytes(logMessage));
        }

        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        protected void publish(List<KeyedMessage<String, byte[]>> list) {
            this.producer.publish(list);
        }

        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        protected void logError(String str, Exception exc) {
            KafkaLogAppender.this.addError(str, exc);
        }
    }

    @Inject
    KafkaLogAppender(CConfiguration cConfiguration) {
        setName(APPENDER_NAME);
        this.kafkaLogPublisher = new KafkaLogPublisher(cConfiguration);
    }

    public void start() {
        this.kafkaLogPublisher.startAndWait();
        addInfo("Successfully initialized KafkaLogAppender.");
        super.start();
    }

    public void stop() {
        this.kafkaLogPublisher.stopAndWait();
        super.stop();
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void appendEvent(LogMessage logMessage) {
        logMessage.prepareForDeferredProcessing();
        logMessage.getCallerData();
        try {
            this.kafkaLogPublisher.addMessage(logMessage);
        } catch (InterruptedException e) {
            addInfo("Interrupted when adding log message to queue: " + logMessage.getFormattedMessage());
        }
    }
}
