package co.cask.cdap.metrics.collect;

import co.cask.cdap.api.metrics.MetricValue;
import co.cask.cdap.common.io.BinaryEncoder;
import co.cask.cdap.common.io.Encoder;
import co.cask.cdap.internal.io.DatumWriter;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaClient;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/metrics/collect/KafkaMetricsCollectionService.class */
public final class KafkaMetricsCollectionService extends AggregatedMetricsCollectionService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollectionService.class);
    private final KafkaClient kafkaClient;
    private final String topicPrefix;
    private final KafkaPublisher.Ack ack;
    private final DatumWriter<MetricValue> recordWriter;
    private final ByteArrayOutputStream encoderOutputStream;
    private final Encoder encoder;
    private KafkaPublisher publisher;

    @Inject
    public KafkaMetricsCollectionService(KafkaClient kafkaClient, @Named("metrics.kafka.topic.prefix") String str, DatumWriter<MetricValue> datumWriter) {
        this(kafkaClient, str, KafkaPublisher.Ack.FIRE_AND_FORGET, datumWriter);
    }

    public KafkaMetricsCollectionService(KafkaClient kafkaClient, String str, KafkaPublisher.Ack ack, DatumWriter<MetricValue> datumWriter) {
        this.kafkaClient = kafkaClient;
        this.topicPrefix = str;
        this.ack = ack;
        this.recordWriter = datumWriter;
        this.encoderOutputStream = new ByteArrayOutputStream(1024);
        this.encoder = new BinaryEncoder(this.encoderOutputStream);
    }

    protected void startUp() throws Exception {
        getPublisher();
    }

    @Override // co.cask.cdap.metrics.collect.AggregatedMetricsCollectionService
    protected void publish(Iterator<MetricValue> it) throws Exception {
        KafkaPublisher publisher = getPublisher();
        if (publisher == null) {
            LOG.warn("Unable to get kafka publisher, will not be able to publish metrics.");
            return;
        }
        this.encoderOutputStream.reset();
        KafkaPublisher.Preparer prepare = publisher.prepare(this.topicPrefix);
        while (it.hasNext()) {
            MetricValue next = it.next();
            this.recordWriter.encode(next, this.encoder);
            prepare.add(ByteBuffer.wrap(this.encoderOutputStream.toByteArray()), getPartitionKey(next));
            this.encoderOutputStream.reset();
        }
        prepare.send();
    }

    private Integer getPartitionKey(MetricValue metricValue) {
        return Integer.valueOf(metricValue.getTags().hashCode());
    }

    private KafkaPublisher getPublisher() {
        if (this.publisher != null) {
            return this.publisher;
        }
        try {
            this.publisher = this.kafkaClient.getPublisher(this.ack, Compression.SNAPPY);
        } catch (IllegalStateException e) {
            this.publisher = null;
        }
        return this.publisher;
    }
}
