package co.cask.cdap.logging.appender.kafka;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.logging.LoggingConfiguration;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.twill.common.Threads;

/* loaded from: input_file:co/cask/cdap/logging/appender/kafka/SimpleKafkaProducer.class */
final class SimpleKafkaProducer {
    private final Producer<String, byte[]> producer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleKafkaProducer(CConfiguration cConfiguration) {
        Properties properties = new Properties();
        properties.setProperty("metadata.broker.list", cConfiguration.get(LoggingConfiguration.KAFKA_SEED_BROKERS));
        properties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
        properties.setProperty("partitioner.class", "co.cask.cdap.logging.appender.kafka.StringPartitioner");
        properties.setProperty("request.required.acks", "1");
        properties.setProperty("producer.type", cConfiguration.get(LoggingConfiguration.KAFKA_PRODUCER_TYPE, LoggingConfiguration.DEFAULT_KAFKA_PRODUCER_TYPE));
        properties.setProperty("queue.buffering.max.ms", cConfiguration.get(LoggingConfiguration.KAFKA_PRODUCER_BUFFER_MS, Long.toString(1000L)));
        properties.setProperty("log.publish.num.partitions", cConfiguration.get("log.publish.num.partitions"));
        this.producer = createProducer(new ProducerConfig(properties));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(List<KeyedMessage<String, byte[]>> list) {
        boolean interrupted = Thread.interrupted();
        try {
            this.producer.send(list);
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } catch (Throwable th) {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.producer.close();
    }

    private <K, V> Producer<K, V> createProducer(ProducerConfig producerConfig) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("create-producer"));
        try {
            Producer<K, V> producer = (Producer) Futures.getUnchecked(newSingleThreadExecutor.submit(() -> {
                return new Producer(producerConfig);
            }));
            newSingleThreadExecutor.shutdownNow();
            return producer;
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }
}
