package com.github.kristofa.brave.kafka;

import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.SpanCollectorMetricsHandler;
import com.twitter.zipkin.gen.Span;
import java.io.Closeable;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

/* loaded from: input_file:com/github/kristofa/brave/kafka/KafkaSpanCollector.class */
public class KafkaSpanCollector implements SpanCollector, Closeable {
    private static final Logger LOGGER = Logger.getLogger(KafkaSpanCollector.class.getName());
    private static final Properties DEFAULT_PROPERTIES = new Properties();
    private final Producer<byte[], byte[]> producer;
    private final ExecutorService executorService;
    private final SpanProcessingTask spanProcessingTask;
    private final Future<Integer> future;
    private final BlockingQueue<Span> queue;
    private final SpanCollectorMetricsHandler metricsHandler;

    private static Properties defaultPropertiesWith(String str) {
        Properties properties = new Properties();
        for (String str2 : DEFAULT_PROPERTIES.stringPropertyNames()) {
            properties.setProperty(str2, DEFAULT_PROPERTIES.getProperty(str2));
        }
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }

    public KafkaSpanCollector(String str, SpanCollectorMetricsHandler spanCollectorMetricsHandler) {
        this(defaultPropertiesWith(str), spanCollectorMetricsHandler);
    }

    public KafkaSpanCollector(Properties properties, SpanCollectorMetricsHandler spanCollectorMetricsHandler) {
        this.producer = new KafkaProducer(properties);
        this.metricsHandler = spanCollectorMetricsHandler;
        this.executorService = Executors.newSingleThreadExecutor();
        this.queue = new ArrayBlockingQueue(1000);
        this.spanProcessingTask = new SpanProcessingTask(this.queue, this.producer, spanCollectorMetricsHandler);
        this.future = this.executorService.submit(this.spanProcessingTask);
    }

    public void collect(Span span) {
        this.metricsHandler.incrementAcceptedSpans(1);
        if (this.queue.offer(span)) {
            return;
        }
        this.metricsHandler.incrementDroppedSpans(1);
        LOGGER.log(Level.WARNING, "Queue rejected span!");
    }

    public void addDefaultAnnotation(String str, String str2) {
        throw new UnsupportedOperationException();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.spanProcessingTask.stop();
        try {
            LOGGER.info("SpanProcessingTask processed " + this.future.get(6000L, TimeUnit.MILLISECONDS) + " spans.");
        } catch (Exception e) {
            LOGGER.log(Level.WARNING, "Exception when waiting for SpanProcessTask to finish.", (Throwable) e);
        }
        this.executorService.shutdown();
        this.producer.close();
        this.metricsHandler.incrementDroppedSpans(this.queue.size());
        LOGGER.info("KafkaSpanCollector closed.");
    }

    static {
        DEFAULT_PROPERTIES.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        DEFAULT_PROPERTIES.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    }
}
