package io.confluent.support.metrics.submitters;

import io.confluent.support.metrics.common.kafka.KafkaUtilities;
import io.confluent.support.metrics.common.kafka.ZkClientProvider;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import kafka.zk.KafkaZkClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/support/metrics/submitters/KafkaSubmitter.class */
public class KafkaSubmitter implements Submitter {
    private static final Logger log = LoggerFactory.getLogger(KafkaSubmitter.class);
    private static final Integer requiredNumAcks = 1;
    private static final int maxBlockMs = 10000;
    private final String topic;
    private final ZkClientProvider zkClientProvider;
    private KafkaZkClient zkClient;
    private static final int BOOTSTRAP_SERVERS = 3;

    public KafkaSubmitter(ZkClientProvider zkClientProvider, String str) {
        if (zkClientProvider == null) {
            throw new IllegalArgumentException("must specify zkClientProvider");
        }
        this.zkClientProvider = zkClientProvider;
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("must specify topic");
        }
        this.topic = str;
    }

    @Override // io.confluent.support.metrics.submitters.Submitter
    public void submit(byte[] bArr) {
        submit(bArr, createProducer());
    }

    protected void submit(byte[] bArr, Producer<byte[], byte[]> producer) {
        if (bArr == null || bArr.length <= 0) {
            log.error("Could not submit metrics to Kafka (metrics data missing)");
            return;
        }
        Future send = producer.send(new ProducerRecord(this.topic, bArr));
        producer.flush();
        producer.close();
        try {
            if (send != null) {
                send.get();
            } else {
                log.error("Failed to submit metrics to Kafka topic {}: null response", this.topic);
            }
            log.info("Successfully submitted metrics to Kafka topic {}", this.topic);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Failed to submit metrics to Kafka topic {} (canceled request): {}", this.topic, e.toString());
        } catch (ExecutionException e2) {
            log.error("Failed to submit metrics to Kafka topic {} (due to exception): {}", this.topic, e2.toString());
        }
    }

    private Producer<byte[], byte[]> createProducer() {
        Properties properties = new Properties();
        if (this.zkClient == null) {
            this.zkClient = this.zkClientProvider.zkClient();
        }
        List<String> bootstrapServers = new KafkaUtilities().getBootstrapServers(this.zkClient, 3);
        properties.put("bootstrap.servers", StringUtils.join((String[]) bootstrapServers.toArray(new String[bootstrapServers.size()]), ","));
        properties.put("acks", Integer.toString(requiredNumAcks.intValue()));
        properties.put("max.block.ms", Integer.valueOf(maxBlockMs));
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer(properties);
    }
}
