package com.github.kristofa.brave.kafka;

import com.github.kristofa.brave.SpanCollectorMetricsHandler;
import com.twitter.zipkin.gen.Span;
import java.io.ByteArrayOutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;

/* loaded from: input_file:com/github/kristofa/brave/kafka/SpanProcessingTask.class */
class SpanProcessingTask implements Callable<Integer> {
    private static final Logger LOGGER = Logger.getLogger(SpanProcessingTask.class.getName());
    private final BlockingQueue<Span> queue;
    private final Producer<byte[], byte[]> producer;
    private final SpanCollectorMetricsHandler metricsHandler;
    private volatile boolean stop = false;
    private int numProcessedSpans = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpanProcessingTask(BlockingQueue<Span> blockingQueue, Producer<byte[], byte[]> producer, SpanCollectorMetricsHandler spanCollectorMetricsHandler) {
        this.queue = blockingQueue;
        this.producer = producer;
        this.metricsHandler = spanCollectorMetricsHandler;
    }

    public void stop() {
        this.stop = true;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
        TProtocol protocol = new TBinaryProtocol.Factory().getProtocol(new TIOStreamTransport(byteArrayOutputStream));
        do {
            Span poll = this.queue.poll(5L, TimeUnit.SECONDS);
            if (poll != null) {
                byteArrayOutputStream.reset();
                try {
                    poll.write(protocol);
                    this.producer.send(new ProducerRecord("zipkin", byteArrayOutputStream.toByteArray()));
                    this.numProcessedSpans++;
                } catch (TException e) {
                    this.metricsHandler.incrementDroppedSpans(1);
                    LOGGER.log(Level.WARNING, "TException when writing span.", e);
                }
            }
        } while (!this.stop);
        return Integer.valueOf(this.numProcessedSpans);
    }
}
