package org.apache.kylin.shaded.htrace.org.apache.htrace.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kylin.shaded.htrace.com.twitter.zipkin.gen.LogEntry;
import org.apache.kylin.shaded.htrace.com.twitter.zipkin.gen.Scribe;
import org.apache.kylin.shaded.htrace.org.apache.commons.codec.binary.Base64;
import org.apache.kylin.shaded.htrace.org.apache.commons.logging.Log;
import org.apache.kylin.shaded.htrace.org.apache.commons.logging.LogFactory;
import org.apache.kylin.shaded.htrace.org.apache.htrace.HTraceConfiguration;
import org.apache.kylin.shaded.htrace.org.apache.htrace.Span;
import org.apache.kylin.shaded.htrace.org.apache.htrace.SpanReceiver;
import org.apache.kylin.shaded.htrace.org.apache.htrace.zipkin.HTraceToZipkinConverter;
import org.apache.kylin.shaded.htrace.org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.kylin.shaded.htrace.org.apache.thrift.protocol.TProtocol;
import org.apache.kylin.shaded.htrace.org.apache.thrift.protocol.TProtocolFactory;
import org.apache.kylin.shaded.htrace.org.apache.thrift.transport.TFramedTransport;
import org.apache.kylin.shaded.htrace.org.apache.thrift.transport.TIOStreamTransport;
import org.apache.kylin.shaded.htrace.org.apache.thrift.transport.TSocket;
import org.apache.kylin.shaded.htrace.org.apache.thrift.transport.TTransportException;

/* loaded from: input_file:WEB-INF/lib/kylin-external-htrace-2.3.0.jar:org/apache/kylin/shaded/htrace/org/apache/htrace/impl/ZipkinSpanReceiver.class */
public class ZipkinSpanReceiver implements SpanReceiver {
    private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
    private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
    private static final int DEFAULT_COLLECTOR_PORT = 9410;
    private static final String CATEGORY = "zipkin";
    private static final boolean DEFAULT_IN_CLIENT_MODE = false;
    private static final int SHUTDOWN_TIMEOUT = 30;
    private static final int MAX_SPAN_BATCH_SIZE = 100;
    private static final int MAX_ERRORS = 10;
    private HTraceToZipkinConverter converter;
    private ExecutorService service;
    private HTraceConfiguration conf;
    private String collectorHostname;
    private int collectorPort;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final BlockingQueue<Span> queue = new ArrayBlockingQueue(1000);
    private final TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
    private final ThreadFactory tf = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("zipkinSpanReceiver-%d").build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-external-htrace-2.3.0.jar:org/apache/kylin/shaded/htrace/org/apache/htrace/impl/ZipkinSpanReceiver$WriteSpanRunnable.class */
    public class WriteSpanRunnable implements Runnable {
        private Scribe.Client scribeClient = null;
        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        private final TProtocol streamProtocol;

        public WriteSpanRunnable() {
            this.streamProtocol = ZipkinSpanReceiver.this.protocolFactory.getProtocol(new TIOStreamTransport(this.baos));
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList(100);
            long j = 0;
            while (true) {
                if (!ZipkinSpanReceiver.this.running.get() && ZipkinSpanReceiver.this.queue.size() <= 0) {
                    closeClient();
                    return;
                }
                try {
                    Span span = (Span) ZipkinSpanReceiver.this.queue.poll(1L, TimeUnit.SECONDS);
                    if (span != null) {
                        arrayList.add(span);
                        ZipkinSpanReceiver.this.queue.drainTo(arrayList, 99);
                    }
                } catch (InterruptedException e) {
                }
                if (!arrayList.isEmpty()) {
                    if (this.scribeClient == null) {
                        startClient();
                    }
                    ArrayList arrayList2 = new ArrayList(arrayList.size());
                    try {
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            org.apache.kylin.shaded.htrace.com.twitter.zipkin.gen.Span convert = ZipkinSpanReceiver.this.converter.convert((Span) it.next());
                            this.baos.reset();
                            convert.write(this.streamProtocol);
                            arrayList2.add(new LogEntry(ZipkinSpanReceiver.CATEGORY, Base64.encodeBase64String(this.baos.toByteArray())));
                        }
                        this.scribeClient.Log(arrayList2);
                        arrayList.clear();
                        j = 0;
                    } catch (Exception e2) {
                        ZipkinSpanReceiver.LOG.error("Error when writing to the zipkin collector: " + ZipkinSpanReceiver.this.collectorHostname + ":" + ZipkinSpanReceiver.this.collectorPort, e2);
                        j++;
                        if (j < 10) {
                            try {
                                ZipkinSpanReceiver.this.queue.addAll(arrayList);
                            } catch (IllegalStateException e3) {
                                ZipkinSpanReceiver.LOG.error("Drop " + arrayList.size() + " span(s) because queue is full");
                            }
                        }
                        closeClient();
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e4) {
                        }
                    }
                }
            }
        }

        private void closeClient() {
            if (this.scribeClient != null) {
                this.scribeClient.getInputProtocol().getTransport().close();
                this.scribeClient = null;
            }
        }

        private void startClient() {
            if (this.scribeClient == null) {
                TFramedTransport tFramedTransport = new TFramedTransport(new TSocket(ZipkinSpanReceiver.this.collectorHostname, ZipkinSpanReceiver.this.collectorPort));
                try {
                    tFramedTransport.open();
                } catch (TTransportException e) {
                    e.printStackTrace();
                }
                this.scribeClient = new Scribe.Client(ZipkinSpanReceiver.this.protocolFactory.getProtocol(tFramedTransport));
            }
        }
    }

    public ZipkinSpanReceiver(HTraceConfiguration hTraceConfiguration) {
        configure(hTraceConfiguration);
    }

    private void configure(HTraceConfiguration hTraceConfiguration) {
        this.conf = hTraceConfiguration;
        this.collectorHostname = hTraceConfiguration.get("zipkin.collector-hostname", "localhost");
        this.collectorPort = hTraceConfiguration.getInt("zipkin.collector-port", DEFAULT_COLLECTOR_PORT);
        initConverter();
        int i = hTraceConfiguration.getInt("zipkin.num-threads", 1);
        if (this.service != null) {
            this.service.shutdownNow();
            this.service = null;
        }
        this.service = Executors.newFixedThreadPool(i, this.tf);
        for (int i2 = 0; i2 < i; i2++) {
            this.service.submit(new WriteSpanRunnable());
        }
    }

    private void initConverter() {
        InetAddress inetAddress = null;
        try {
            inetAddress = InetAddress.getByName(this.conf.get("zipkin.traced-service-hostname", InetAddress.getLocalHost().getHostAddress()));
        } catch (UnknownHostException e) {
            LOG.error("Couldn't get the localHost address", e);
        }
        this.converter = new HTraceToZipkinConverter(ByteBuffer.wrap(inetAddress != null ? inetAddress.getAddress() : "localhost".getBytes()).getInt(), (short) this.conf.getInt("zipkin.traced-service-port", -1));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running.set(false);
        this.service.shutdown();
        try {
            if (!this.service.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.error("Was not able to process all remaining spans to write upon closing in: 30 " + TimeUnit.SECONDS + ". There could be un-sent spans still left.  They have been dropped.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Thread interrupted when terminating executor.", e);
        }
    }

    @Override // org.apache.kylin.shaded.htrace.org.apache.htrace.SpanReceiver
    public void receiveSpan(Span span) {
        if (this.running.get()) {
            try {
                this.queue.add(span);
            } catch (IllegalStateException e) {
                LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue.  Blocking Queue was full.");
            }
        }
    }
}
