package co.cask.cdap.logging.appender.kafka;

import ch.qos.logback.core.spi.ContextAware;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import kafka.producer.KeyedMessage;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender.class */
public final class KafkaLogAppender extends LogAppender {
    private static final int QUEUE_SIZE = 512;
    private static final String APPENDER_NAME = "KafkaLogAppender";
    private final BlockingQueue<LogMessage> messageQueue;
    private final KafkaLogPublisher kafkaLogPublisher;

    /* loaded from: input_file:co/cask/cdap/logging/appender/kafka/KafkaLogAppender$KafkaLogPublisher.class */
    private static final class KafkaLogPublisher extends AbstractExecutionThreadService {
        private final CConfiguration cConf;
        private final BlockingQueue<LogMessage> messageQueue;
        private final String topic;
        private final LoggingEventSerializer loggingEventSerializer;
        private final ContextAware contextAware;
        private final RetryStrategy retryStrategy;
        private SimpleKafkaProducer producer;
        private volatile Thread blockingThread;

        private KafkaLogPublisher(CConfiguration cConfiguration, BlockingQueue<LogMessage> blockingQueue, ContextAware contextAware) {
            this.cConf = cConfiguration;
            this.messageQueue = blockingQueue;
            this.topic = cConfiguration.get("log.kafka.topic");
            this.loggingEventSerializer = new LoggingEventSerializer();
            this.contextAware = contextAware;
            this.retryStrategy = RetryStrategies.fromConfiguration(cConfiguration, "system.log.process.");
        }

        protected void startUp() throws Exception {
            this.producer = new SimpleKafkaProducer(this.cConf);
        }

        protected void shutDown() throws Exception {
            this.producer.stop();
        }

        protected void run() {
            ArrayList arrayList = new ArrayList(KafkaLogAppender.QUEUE_SIZE);
            int i = 0;
            long currentTimeMillis = System.currentTimeMillis();
            while (isRunning()) {
                try {
                    publishMessages(arrayList, i == 0);
                    arrayList.clear();
                    i = 0;
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    if (i == 0) {
                        currentTimeMillis = System.currentTimeMillis();
                    }
                    i++;
                    long nextRetry = this.retryStrategy.nextRetry(i, currentTimeMillis);
                    if (nextRetry < 0) {
                        arrayList.clear();
                        i = 0;
                        this.contextAware.addError("Failed to publish log message to Kafka on topic " + this.topic, e2);
                    } else {
                        this.blockingThread = Thread.currentThread();
                        try {
                            if (isRunning()) {
                                TimeUnit.MILLISECONDS.sleep(nextRetry);
                            }
                            this.blockingThread = null;
                        } catch (InterruptedException e3) {
                            this.blockingThread = null;
                        } catch (Throwable th) {
                            this.blockingThread = null;
                            throw th;
                        }
                    }
                }
            }
            while (true) {
                if (this.messageQueue.isEmpty() && arrayList.isEmpty()) {
                    return;
                }
                try {
                    publishMessages(arrayList, false);
                } catch (Exception e4) {
                    this.contextAware.addError("Failed to publish log message to Kafka on topic " + this.topic, e4);
                }
                arrayList.clear();
            }
        }

        protected void triggerShutdown() {
            Thread thread = this.blockingThread;
            if (thread != null) {
                thread.interrupt();
            }
        }

        protected Executor executor() {
            return new Executor() { // from class: co.cask.cdap.logging.appender.kafka.KafkaLogAppender.KafkaLogPublisher.1
                @Override // java.util.concurrent.Executor
                public void execute(Runnable runnable) {
                    Thread thread = new Thread(runnable, "kafka-log-publisher");
                    thread.setDaemon(true);
                    thread.start();
                }
            };
        }

        private void publishMessages(List<KeyedMessage<String, byte[]>> list, boolean z) throws InterruptedException {
            LogMessage poll;
            int i = KafkaLogAppender.QUEUE_SIZE;
            if (z) {
                this.blockingThread = Thread.currentThread();
                try {
                    if (isRunning()) {
                        list.add(createKeyedMessage(this.messageQueue.take()));
                        i--;
                    }
                    this.blockingThread = null;
                } catch (InterruptedException e) {
                    this.blockingThread = null;
                } catch (Throwable th) {
                    this.blockingThread = null;
                    throw th;
                }
            }
            while (list.size() < i && (poll = this.messageQueue.poll()) != null) {
                list.add(createKeyedMessage(poll));
            }
            this.producer.publish(list);
        }

        private KeyedMessage<String, byte[]> createKeyedMessage(LogMessage logMessage) {
            return new KeyedMessage<>(this.topic, getPartitionKey(logMessage.getLoggingContext()), this.loggingEventSerializer.toBytes(logMessage));
        }

        private String getPartitionKey(LoggingContext loggingContext) {
            String value = ((LoggingContext.SystemTag) loggingContext.getSystemTagsMap().get(".namespaceId")).getValue();
            if (NamespaceId.SYSTEM.getNamespace().equals(value)) {
                return loggingContext.getLogPartition();
            }
            switch (LogPartitionType.valueOf(this.cConf.get("log.publish.partition.key").toUpperCase())) {
                case PROGRAM:
                    return loggingContext.getLogPartition();
                case APPLICATION:
                    return value + ":" + ((LoggingContext.SystemTag) loggingContext.getSystemTagsMap().get(".applicationId")).getValue();
                default:
                    throw new IllegalArgumentException(String.format("Invalid log partition type %s. Allowed partition types are program/application", this.cConf.get("log.publish.partition.key")));
            }
        }
    }

    @Inject
    KafkaLogAppender(CConfiguration cConfiguration) {
        setName(APPENDER_NAME);
        this.messageQueue = new ArrayBlockingQueue(QUEUE_SIZE);
        this.kafkaLogPublisher = new KafkaLogPublisher(cConfiguration, this.messageQueue, this);
    }

    public void start() {
        this.kafkaLogPublisher.startAndWait();
        addInfo("Successfully initialized KafkaLogAppender.");
        super.start();
    }

    public void stop() {
        this.kafkaLogPublisher.stopAndWait();
        super.stop();
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void appendEvent(LogMessage logMessage) {
        logMessage.prepareForDeferredProcessing();
        logMessage.getCallerData();
        try {
            this.messageQueue.put(logMessage);
        } catch (InterruptedException e) {
            addInfo("Interrupted when adding log message to queue: " + logMessage.getFormattedMessage());
        }
    }
}
