package org.apache.htrace.impl;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;

/* loaded from: input_file:org/apache/htrace/impl/FlumeSpanReceiver.class */
public class FlumeSpanReceiver extends SpanReceiver {
    private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class);
    public static final String NUM_THREADS_KEY = "htrace.flume.num-threads";
    public static final int DEFAULT_NUM_THREADS = 1;
    public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname";
    public static final String DEFAULT_FLUME_HOSTNAME = "localhost";
    public static final String FLUME_PORT_KEY = "htrace.flume.port";
    public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize";
    public static final int DEFAULT_FLUME_BATCHSIZE = 100;
    private static final int SHUTDOWN_TIMEOUT = 30;
    private static final int MAX_ERRORS = 10;
    private ExecutorService service;
    private int maxSpanBatchSize;
    private String flumeHostName;
    private int flumePort;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final BlockingQueue<Span> queue = new ArrayBlockingQueue(1000);
    private final ThreadFactory tf = new SimpleThreadFactory();

    /* loaded from: input_file:org/apache/htrace/impl/FlumeSpanReceiver$SimpleThreadFactory.class */
    private class SimpleThreadFactory implements ThreadFactory {
        final AtomicLong count;

        private SimpleThreadFactory() {
            this.count = new AtomicLong(0L);
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, String.format("flumeSpanReceiver-%d", Long.valueOf(this.count.getAndIncrement())));
            thread.setDaemon(true);
            return thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/htrace/impl/FlumeSpanReceiver$WriteSpanRunnable.class */
    public class WriteSpanRunnable implements Runnable {
        private RpcClient flumeClient;

        private WriteSpanRunnable() {
            this.flumeClient = null;
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList<Span> arrayList = new ArrayList(FlumeSpanReceiver.this.maxSpanBatchSize);
            long j = 0;
            while (true) {
                if (!FlumeSpanReceiver.this.running.get() && FlumeSpanReceiver.this.queue.size() <= 0) {
                    closeClient();
                    return;
                }
                try {
                    Span span = (Span) FlumeSpanReceiver.this.queue.poll(1L, TimeUnit.SECONDS);
                    if (span != null) {
                        arrayList.add(span);
                        FlumeSpanReceiver.this.queue.drainTo(arrayList, FlumeSpanReceiver.this.maxSpanBatchSize - 1);
                    }
                } catch (InterruptedException e) {
                }
                startClient();
                if (!arrayList.isEmpty()) {
                    try {
                        ArrayList arrayList2 = new ArrayList(arrayList.size());
                        for (Span span2 : arrayList) {
                            HashMap hashMap = new HashMap();
                            hashMap.put("SpanId", span2.toString());
                            hashMap.put("TracerId", span2.getTracerId());
                            hashMap.put("Description", span2.getDescription());
                            arrayList2.add(EventBuilder.withBody(span2.toJson(), Charset.forName("UTF-8"), hashMap));
                        }
                        this.flumeClient.appendBatch(arrayList2);
                        arrayList.clear();
                        j = 0;
                    } catch (Exception e2) {
                        j++;
                        if (j < 10) {
                            try {
                                FlumeSpanReceiver.this.queue.addAll(arrayList);
                            } catch (IllegalStateException e3) {
                                FlumeSpanReceiver.LOG.error("Drop " + arrayList.size() + " span(s) because writing to HBase failed.");
                            }
                        }
                        closeClient();
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e4) {
                        }
                    }
                }
            }
        }

        private void closeClient() {
            if (this.flumeClient != null) {
                try {
                    try {
                        this.flumeClient.close();
                        this.flumeClient = null;
                    } catch (FlumeException e) {
                        FlumeSpanReceiver.LOG.warn("Error while trying to close Flume Rpc Client.", e);
                        this.flumeClient = null;
                    }
                } catch (Throwable th) {
                    this.flumeClient = null;
                    throw th;
                }
            }
        }

        private void startClient() {
            if (this.flumeClient != null && !this.flumeClient.isActive()) {
                this.flumeClient.close();
                this.flumeClient = null;
            }
            if (this.flumeClient == null) {
                try {
                    this.flumeClient = RpcClientFactory.getDefaultInstance(FlumeSpanReceiver.this.flumeHostName, Integer.valueOf(FlumeSpanReceiver.this.flumePort), Integer.valueOf(FlumeSpanReceiver.this.maxSpanBatchSize));
                } catch (FlumeException e) {
                    FlumeSpanReceiver.LOG.warn("Failed to create Flume RPC Client. " + e.getMessage());
                }
            }
        }
    }

    public FlumeSpanReceiver(HTraceConfiguration hTraceConfiguration) {
        configure(hTraceConfiguration);
    }

    private void configure(HTraceConfiguration hTraceConfiguration) {
        int i = hTraceConfiguration.getInt(NUM_THREADS_KEY, 1);
        this.flumeHostName = hTraceConfiguration.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME);
        this.flumePort = hTraceConfiguration.getInt(FLUME_PORT_KEY, 0);
        if (this.flumePort == 0) {
            throw new IllegalArgumentException("htrace.flume.port is required in configuration.");
        }
        this.maxSpanBatchSize = hTraceConfiguration.getInt(FLUME_BATCHSIZE_KEY, 100);
        if (this.service != null) {
            this.service.shutdownNow();
            this.service = null;
        }
        this.service = Executors.newFixedThreadPool(i, this.tf);
        for (int i2 = 0; i2 < i; i2++) {
            this.service.submit(new WriteSpanRunnable());
        }
    }

    public void close() throws IOException {
        this.running.set(false);
        this.service.shutdown();
        try {
            if (!this.service.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.error("Was not able to process all remaining spans upon closing in: 30 " + TimeUnit.SECONDS + ". Left Spans could be dropped.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Thread interrupted when terminating executor.", e);
        }
    }

    public void receiveSpan(Span span) {
        if (this.running.get()) {
            try {
                this.queue.add(span);
            } catch (IllegalStateException e) {
                LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue. Blocking Queue was full.");
            }
        }
    }
}
