package co.cask.cdap.logging.appender.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender.class */
public final class KafkaLogAppender extends LogAppender {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogAppender.class);
    public static final String APPENDER_NAME = "KafkaLogAppender";
    private final SimpleKafkaProducer producer;
    private final LoggingEventSerializer loggingEventSerializer;
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    @Inject
    public KafkaLogAppender(CConfiguration cConfiguration) {
        setName(APPENDER_NAME);
        addInfo("Initializing KafkaLogAppender...");
        this.producer = new SimpleKafkaProducer(cConfiguration);
        try {
            this.loggingEventSerializer = new LoggingEventSerializer();
            addInfo("Successfully initialized KafkaLogAppender.");
        } catch (IOException e) {
            addError("Error initializing KafkaLogAppender.", e);
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void append(LogMessage logMessage) {
        try {
            this.producer.publish(logMessage.getLoggingContext().getLogPartition(), this.loggingEventSerializer.toBytes(logMessage.getLoggingEvent(), logMessage.getLoggingContext()));
        } catch (Throwable th) {
            LOG.error("Got exception while serializing log event {}.", logMessage.getLoggingEvent(), th);
        }
    }

    public void stop() {
        if (this.stopped.compareAndSet(false, true)) {
            super.stop();
            this.producer.stop();
        }
    }
}
