package org.apache.gobblin.metrics.kafka;

import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.KafkaCommonUtil;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/metrics/kafka/KafkaProducerPusher.class */
public class KafkaProducerPusher implements Pusher<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerPusher.class);
    private static final long DEFAULT_MAX_NUM_FUTURES_TO_BUFFER = 1000;
    private static final String MAX_NUM_FUTURES_TO_BUFFER_KEY = "numFuturesToBuffer";
    private final String topic;
    private final KafkaProducer<String, byte[]> producer;
    private final Closer closer;
    private final Queue<Future<RecordMetadata>> futures;
    private long numFuturesToBuffer;

    public KafkaProducerPusher(String str, String str2, Optional<Config> optional) {
        this.futures = new LinkedBlockingDeque();
        this.numFuturesToBuffer = DEFAULT_MAX_NUM_FUTURES_TO_BUFFER;
        this.closer = Closer.create();
        this.topic = str2;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("acks", "all");
        properties.put("retries", 3);
        if (optional.isPresent()) {
            properties.putAll(ConfigUtils.configToProperties((Config) optional.get()));
            this.numFuturesToBuffer = ConfigUtils.getLong((Config) optional.get(), MAX_NUM_FUTURES_TO_BUFFER_KEY, Long.valueOf(DEFAULT_MAX_NUM_FUTURES_TO_BUFFER)).longValue();
        }
        this.producer = createProducer(properties);
    }

    public KafkaProducerPusher(String str, String str2) {
        this(str, str2, Optional.absent());
    }

    public void pushMessages(List<byte[]> list) {
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            this.futures.offer(this.producer.send(new ProducerRecord(this.topic, it.next()), (recordMetadata, exc) -> {
                if (exc != null) {
                    log.error("Failed to send message to topic {} due to exception: ", this.topic, exc);
                }
            }));
        }
        if (this.futures.size() >= this.numFuturesToBuffer) {
            flush(list.size());
        }
    }

    private void flush(long j) {
        log.debug("Flushing records from producer buffer");
        long j2 = 0;
        while (true) {
            Future<RecordMetadata> poll = this.futures.poll();
            if (poll == null) {
                break;
            }
            long j3 = j2;
            j2 = j3 + 1;
            if (j3 >= j) {
                break;
            }
            try {
                poll.get();
            } catch (Exception e) {
                log.error("Exception encountered when flushing record", e);
            }
        }
        log.debug("Flushed {} records from producer buffer", Long.valueOf(j2));
    }

    public void close() throws IOException {
        log.info("Flushing records before close");
        try {
            KafkaCommonUtil.runWithTimeout(() -> {
                flush(Long.MAX_VALUE);
            }, 15L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            log.warn("Flush records before close was interrupted! Reached {} seconds timeout!", 15L);
        } catch (Exception e2) {
            log.error("Exception encountered when flushing record before close", e2);
        }
        this.closer.close();
    }

    protected KafkaProducer<String, byte[]> createProducer(Properties properties) {
        return this.closer.register(new KafkaProducer(properties));
    }
}
