package org.apache.gobblin.metrics.kafka;

import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.class */
public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaKeyValueProducerPusher.class);
    private final String topic;
    private final KafkaProducer<K, V> producer;
    private final Closer closer;

    public KafkaKeyValueProducerPusher(String str, String str2, Optional<Config> optional) {
        this.closer = Closer.create();
        this.topic = str2;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("acks", "all");
        properties.put("retries", 3);
        if (optional.isPresent()) {
            properties.putAll(ConfigUtils.configToProperties((Config) optional.get()));
        }
        this.producer = createProducer(properties);
    }

    public KafkaKeyValueProducerPusher(String str, String str2) {
        this(str, str2, Optional.absent());
    }

    public void pushMessages(List<Pair<K, V>> list) {
        for (Pair<K, V> pair : list) {
            this.producer.send(new ProducerRecord(this.topic, pair.getKey(), pair.getValue()), (recordMetadata, exc) -> {
                if (exc != null) {
                    log.error("Failed to send message to topic {} due to exception: ", this.topic, exc);
                }
            });
        }
    }

    public void close() throws IOException {
        this.closer.close();
    }

    protected KafkaProducer<K, V> createProducer(Properties properties) {
        return this.closer.register(new KafkaProducer(properties));
    }
}
