package co.cask.cdap.logging.appender.tms;

import co.cask.cdap.api.messaging.MessagePublisher;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.logging.appender.AbstractLogPublisher;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import co.cask.cdap.logging.appender.kafka.LogPartitionType;
import co.cask.cdap.logging.serialize.LoggingEventSerializer;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.context.MultiThreadMessagingContext;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:co/cask/cdap/logging/appender/tms/TMSLogAppender.class */
public final class TMSLogAppender extends LogAppender {
    private static final String APPENDER_NAME = "TMSLogAppender";
    private final TMSLogPublisher tmsLogPublisher;

    /* loaded from: input_file:co/cask/cdap/logging/appender/tms/TMSLogAppender$TMSLogPublisher.class */
    private final class TMSLogPublisher extends AbstractLogPublisher<Map.Entry<Integer, byte[]>> {
        private final String topicPrefix;
        private final int numPartitions;
        private final LoggingEventSerializer loggingEventSerializer;
        private final MessagingContext messagingContext;
        private final LogPartitionType logPartitionType;

        private TMSLogPublisher(CConfiguration cConfiguration, MessagingService messagingService, int i) {
            super(i, RetryStrategies.fromConfiguration(cConfiguration, "system.log.process."));
            this.topicPrefix = cConfiguration.get("log.tms.topic.prefix");
            this.numPartitions = cConfiguration.getInt("log.publish.num.partitions");
            this.loggingEventSerializer = new LoggingEventSerializer();
            this.logPartitionType = LogPartitionType.valueOf(cConfiguration.get("log.publish.partition.key").toUpperCase());
            this.messagingContext = new MultiThreadMessagingContext(messagingService);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        public Map.Entry<Integer, byte[]> createMessage(LogMessage logMessage) {
            return new AbstractMap.SimpleEntry(Integer.valueOf(TMSLogAppender.partition(this.logPartitionType.getPartitionKey(logMessage.getLoggingContext()), this.numPartitions)), this.loggingEventSerializer.toBytes(logMessage));
        }

        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        protected void publish(List<Map.Entry<Integer, byte[]>> list) throws TopicNotFoundException, IOException {
            MessagePublisher directMessagePublisher = this.messagingContext.getDirectMessagePublisher();
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, byte[]> entry : list) {
                ((List) hashMap.computeIfAbsent(entry.getKey(), num -> {
                    return new ArrayList();
                })).add(entry.getValue());
            }
            for (Map.Entry entry2 : hashMap.entrySet()) {
                directMessagePublisher.publish(NamespaceId.SYSTEM.getNamespace(), this.topicPrefix + entry2.getKey(), ((List) entry2.getValue()).iterator());
            }
        }

        @Override // co.cask.cdap.logging.appender.AbstractLogPublisher
        protected void logError(String str, Exception exc) {
            TMSLogAppender.this.addError(str, exc);
        }
    }

    @Inject
    TMSLogAppender(CConfiguration cConfiguration, MessagingService messagingService) {
        setName(APPENDER_NAME);
        this.tmsLogPublisher = new TMSLogPublisher(cConfiguration, messagingService, cConfiguration.getInt("log.tms.queue.size"));
    }

    public void start() {
        this.tmsLogPublisher.startAndWait();
        addInfo("Successfully started TMSLogAppender");
        super.start();
    }

    public void stop() {
        this.tmsLogPublisher.stopAndWait();
        addInfo("Successfully stopped TMSLogAppender");
        super.stop();
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void appendEvent(LogMessage logMessage) {
        logMessage.prepareForDeferredProcessing();
        logMessage.getCallerData();
        try {
            this.tmsLogPublisher.addMessage(logMessage);
        } catch (InterruptedException e) {
            addInfo("Interrupted when adding log message to queue: " + logMessage.getFormattedMessage());
        }
    }

    @VisibleForTesting
    static int partition(Object obj, int i) {
        return Math.abs(Hashing.md5().hashString(obj.toString()).asInt()) % i;
    }
}
