package co.cask.cdap.data2.audit;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.proto.audit.AuditMessage;
import co.cask.cdap.proto.audit.AuditPayload;
import co.cask.cdap.proto.audit.AuditType;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.security.spi.authentication.SecurityRequestContext;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.nio.ByteBuffer;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaClient;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/audit/KafkaAuditPublisher.class */
public class KafkaAuditPublisher implements AuditPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAuditPublisher.class);
    private static final Gson GSON = new Gson();
    private final Supplier<KafkaPublisher> publisherSupplier;
    private final String kafkaTopic;

    @Inject
    public KafkaAuditPublisher(final KafkaClient kafkaClient, CConfiguration cConfiguration) {
        this.publisherSupplier = Suppliers.memoize(new Supplier<KafkaPublisher>() { // from class: co.cask.cdap.data2.audit.KafkaAuditPublisher.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public KafkaPublisher m52get() {
                return kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED, Compression.SNAPPY);
            }
        });
        this.kafkaTopic = cConfiguration.get("audit.kafka.topic");
    }

    @Override // co.cask.cdap.data2.audit.AuditPublisher
    public void publish(EntityId entityId, AuditType auditType, AuditPayload auditPayload) {
        AuditMessage auditMessage = new AuditMessage(System.currentTimeMillis(), entityId, (String) Objects.firstNonNull(SecurityRequestContext.getUserId(), ""), auditType, auditPayload);
        LOG.trace("Publishing audit message {}", auditMessage);
        try {
            ByteBuffer encode = Charsets.UTF_8.encode(GSON.toJson(auditMessage));
            KafkaPublisher.Preparer prepare = ((KafkaPublisher) this.publisherSupplier.get()).prepare(this.kafkaTopic);
            prepare.add(encode, entityId);
            prepare.send().get();
        } catch (Exception e) {
            LOG.error("Got exception publishing audit message {}. Exception:", auditMessage, e);
        }
    }
}
