package co.cask.cdap.logging.appender.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.logging.LoggingConfiguration;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/SimpleKafkaProducer.class */
public final class SimpleKafkaProducer {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaProducer.class);
    private final String kafkaTopic;
    private final Producer<String, byte[]> producer;

    public SimpleKafkaProducer(CConfiguration cConfiguration) {
        Properties properties = new Properties();
        properties.setProperty("metadata.broker.list", cConfiguration.get(LoggingConfiguration.KAFKA_SEED_BROKERS));
        properties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
        properties.setProperty("partitioner.class", "co.cask.cdap.logging.appender.kafka.StringPartitioner");
        properties.setProperty("request.required.acks", "1");
        properties.setProperty("producer.type", cConfiguration.get(LoggingConfiguration.KAFKA_PRODUCER_TYPE, LoggingConfiguration.DEFAULT_KAFKA_PRODUCER_TYPE));
        properties.setProperty("queue.buffering.max.ms", cConfiguration.get(LoggingConfiguration.KAFKA_PROCUDER_BUFFER_MS, Long.toString(1000L)));
        properties.setProperty(LoggingConfiguration.NUM_PARTITIONS, cConfiguration.get(LoggingConfiguration.NUM_PARTITIONS, LoggingConfiguration.DEFAULT_NUM_PARTITIONS));
        ProducerConfig producerConfig = new ProducerConfig(properties);
        this.kafkaTopic = KafkaTopic.getTopic();
        this.producer = new Producer<>(producerConfig);
    }

    public void publish(String str, byte[] bArr) {
        try {
            this.producer.send(new KeyedMessage(this.kafkaTopic, str, bArr));
        } catch (Throwable th) {
            LOG.error("Exception when trying to publish log message to kafka with key {} and topic {}", new Object[]{str, this.kafkaTopic, th});
        }
    }

    public void stop() {
        this.producer.close();
    }
}
