package org.apache.htrace.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.HBaseHTraceConfiguration;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.SpanReceiverBuilder;
import org.apache.htrace.TimelineAnnotation;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.htrace.commons.logging.Log;
import org.apache.htrace.commons.logging.LogFactory;
import org.apache.htrace.protobuf.generated.SpanProtos;

/* loaded from: input_file:org/apache/htrace/impl/HBaseSpanReceiver.class */
public class HBaseSpanReceiver implements SpanReceiver {
    public static final String COLLECTOR_QUORUM_KEY = "htrace.hbase.collector-quorum";
    public static final String DEFAULT_COLLECTOR_QUORUM = "127.0.0.1";
    public static final String ZOOKEEPER_CLIENT_PORT_KEY = "htrace.hbase.zookeeper.property.clientPort";
    public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
    public static final String ZOOKEEPER_ZNODE_PARENT_KEY = "htrace.hbase.zookeeper.znode.parent";
    public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
    public static final String NUM_THREADS_KEY = "htrace.hbase.num-threads";
    public static final int DEFAULT_NUM_THREADS = 1;
    public static final String MAX_SPAN_BATCH_SIZE_KEY = "htrace.hbase.batch.size";
    public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
    public static final String TABLE_KEY = "htrace.hbase.table";
    public static final String DEFAULT_TABLE = "htrace";
    public static final String COLUMNFAMILY_KEY = "htrace.hbase.columnfamily";
    public static final String INDEXFAMILY_KEY = "htrace.hbase.indexfamily";
    public static final String DEFAULT_INDEXFAMILY = "i";
    private static final int SHUTDOWN_TIMEOUT = 30;
    private static final int MAX_ERRORS = 10;
    private ExecutorService service;
    private final HTraceConfiguration conf;
    private final byte[] table;
    private final byte[] cf;
    private final byte[] icf;
    private final int maxSpanBatchSize;
    private final ProcessId processId;
    private static final Log LOG = LogFactory.getLog(HBaseSpanReceiver.class);
    public static final String DEFAULT_COLUMNFAMILY = "s";
    public static final byte[] INDEX_SPAN_QUAL = Bytes.toBytes(DEFAULT_COLUMNFAMILY);
    public static final byte[] INDEX_TIME_QUAL = Bytes.toBytes("t");
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ThreadFactory tf = new ThreadFactory() { // from class: org.apache.htrace.impl.HBaseSpanReceiver.1
        private final AtomicLong receiverIdx = new AtomicLong(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName(String.format("hbaseSpanReceiver-%d", Long.valueOf(this.receiverIdx.getAndIncrement())));
            return thread;
        }
    };
    private final BlockingQueue<Span> queue = new ArrayBlockingQueue(1000);
    private final Configuration hconf = HBaseConfiguration.create();

    /* loaded from: input_file:org/apache/htrace/impl/HBaseSpanReceiver$WriteSpanRunnable.class */
    private class WriteSpanRunnable implements Runnable {
        private Connection hconnection;
        private Table htable;

        public WriteSpanRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SpanProtos.Span.Builder newBuilder = SpanProtos.Span.newBuilder();
            SpanProtos.TimelineAnnotation.Builder newBuilder2 = SpanProtos.TimelineAnnotation.newBuilder();
            ArrayList<Span> arrayList = new ArrayList(HBaseSpanReceiver.this.maxSpanBatchSize);
            long j = 0;
            while (true) {
                if (!HBaseSpanReceiver.this.running.get() && HBaseSpanReceiver.this.queue.size() <= 0) {
                    closeClient();
                    return;
                }
                try {
                    Span span = (Span) HBaseSpanReceiver.this.queue.poll(1L, TimeUnit.SECONDS);
                    if (span != null) {
                        arrayList.add(span);
                        HBaseSpanReceiver.this.queue.drainTo(arrayList, HBaseSpanReceiver.this.maxSpanBatchSize - 1);
                    }
                } catch (InterruptedException e) {
                }
                startClient();
                if (arrayList.isEmpty()) {
                    try {
                        this.htable.flushCommits();
                    } catch (IOException e2) {
                        HBaseSpanReceiver.LOG.error("failed to flush writes to HBase.");
                        closeClient();
                    }
                } else {
                    try {
                        for (Span span2 : arrayList) {
                            newBuilder.clear().setTraceId(span2.getTraceId()).setStart(span2.getStartTimeMillis()).setStop(span2.getStopTimeMillis()).setSpanId(span2.getSpanId()).setProcessId(span2.getProcessId()).setDescription(span2.getDescription());
                            if (span2.getParents().length == 0) {
                                newBuilder.setParentId(0L);
                            } else if (span2.getParents().length > 0) {
                                newBuilder.setParentId(span2.getParents()[0]);
                                if (span2.getParents().length > 1) {
                                    HBaseSpanReceiver.LOG.error("error: HBaseSpanReceiver does not support spans with multiple parents.  Ignoring multiple parents for " + span2);
                                }
                            }
                            for (TimelineAnnotation timelineAnnotation : span2.getTimelineAnnotations()) {
                                newBuilder.addTimeline(newBuilder2.clear().setTime(timelineAnnotation.getTime()).setMessage(timelineAnnotation.getMessage()).build());
                            }
                            Put put = new Put(Bytes.toBytes(span2.getTraceId()));
                            put.add(HBaseSpanReceiver.this.cf, newBuilder.build().toByteArray(), null);
                            if (span2.getParents().length == 0) {
                                put.add(HBaseSpanReceiver.this.icf, HBaseSpanReceiver.INDEX_TIME_QUAL, Bytes.toBytes(span2.getStartTimeMillis()));
                                put.add(HBaseSpanReceiver.this.icf, HBaseSpanReceiver.INDEX_SPAN_QUAL, newBuilder.build().toByteArray());
                            }
                            this.htable.put(put);
                        }
                        arrayList.clear();
                        j = 0;
                    } catch (Exception e3) {
                        j++;
                        if (j < 10) {
                            try {
                                HBaseSpanReceiver.this.queue.addAll(arrayList);
                            } catch (IllegalStateException e4) {
                                HBaseSpanReceiver.LOG.error("Drop " + arrayList.size() + " span(s) because writing to HBase failed.");
                            }
                        }
                        closeClient();
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e5) {
                        }
                    }
                }
            }
        }

        private void closeClient() {
            try {
                if (this.htable != null) {
                    this.htable.close();
                    this.htable = null;
                }
                if (this.hconnection != null) {
                    this.hconnection.close();
                    this.hconnection = null;
                }
            } catch (IOException e) {
                HBaseSpanReceiver.LOG.warn("Failed to close HBase connection. " + e.getMessage());
            }
        }

        private void startClient() {
            if (this.htable == null) {
                try {
                    this.hconnection = ConnectionFactory.createConnection(HBaseSpanReceiver.this.hconf);
                    this.htable = this.hconnection.getTable(TableName.valueOf(HBaseSpanReceiver.this.table));
                } catch (IOException e) {
                    HBaseSpanReceiver.LOG.warn("Failed to create HBase connection. " + e.getMessage());
                }
            }
        }
    }

    public HBaseSpanReceiver(HTraceConfiguration hTraceConfiguration) {
        this.conf = hTraceConfiguration;
        this.table = Bytes.toBytes(hTraceConfiguration.get(TABLE_KEY, "htrace"));
        this.cf = Bytes.toBytes(hTraceConfiguration.get(COLUMNFAMILY_KEY, DEFAULT_COLUMNFAMILY));
        this.icf = Bytes.toBytes(hTraceConfiguration.get(INDEXFAMILY_KEY, DEFAULT_INDEXFAMILY));
        this.maxSpanBatchSize = hTraceConfiguration.getInt(MAX_SPAN_BATCH_SIZE_KEY, 100);
        this.hconf.set(HConstants.ZOOKEEPER_QUORUM, hTraceConfiguration.get(COLLECTOR_QUORUM_KEY, "127.0.0.1"));
        this.hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hTraceConfiguration.get(ZOOKEEPER_ZNODE_PARENT_KEY, "/hbase"));
        this.hconf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, hTraceConfiguration.getInt(ZOOKEEPER_CLIENT_PORT_KEY, 2181));
        if (this.service != null) {
            this.service.shutdownNow();
            this.service = null;
        }
        int i = hTraceConfiguration.getInt(NUM_THREADS_KEY, 1);
        this.service = Executors.newFixedThreadPool(i, this.tf);
        for (int i2 = 0; i2 < i; i2++) {
            this.service.submit(new WriteSpanRunnable());
        }
        this.processId = new ProcessId(hTraceConfiguration);
    }

    public void close() throws IOException {
        this.running.set(false);
        this.service.shutdown();
        try {
            if (!this.service.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.error("Was not able to process all remaining spans upon closing in: 30 " + TimeUnit.SECONDS + ". Left Spans could be dropped.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Thread interrupted when terminating executor.", e);
        }
    }

    public void receiveSpan(Span span) {
        if (this.running.get()) {
            try {
                if (span.getProcessId().isEmpty()) {
                    span.setProcessId(this.processId.get());
                }
                this.queue.add(span);
            } catch (IllegalStateException e) {
                LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue. Blocking Queue was full.");
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        SpanReceiver build = new SpanReceiverBuilder(new HBaseHTraceConfiguration(HBaseConfiguration.create())).spanReceiverClass(HBaseSpanReceiver.class.getName()).build();
        Trace.addReceiver(build);
        TraceScope startSpan = Trace.startSpan("HBaseSpanReceiver.main.parent", Sampler.ALWAYS);
        Thread.sleep(10L);
        long traceId = startSpan.getSpan().getTraceId();
        TraceScope startSpan2 = Trace.startSpan("HBaseSpanReceiver.main.child.1");
        Thread.sleep(10L);
        TraceScope startSpan3 = Trace.startSpan("HBaseSpanReceiver.main.child.2", startSpan.getSpan());
        Thread.sleep(10L);
        TraceScope startSpan4 = Trace.startSpan("HBaseSpanReceiver.main.grandchild");
        Trace.addTimelineAnnotation("annotation 1.");
        Thread.sleep(10L);
        Trace.addTimelineAnnotation("annotation 2.");
        startSpan4.close();
        Thread.sleep(10L);
        startSpan3.close();
        Thread.sleep(10L);
        startSpan2.close();
        startSpan.close();
        build.close();
        System.out.println("trace id: " + traceId);
    }
}
