package org.apache.htrace.impl;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.htrace.commons.logging.Log;
import org.apache.htrace.commons.logging.LogFactory;
import org.apache.htrace.core.HTraceConfiguration;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanReceiver;
import org.apache.htrace.jetty.client.HttpClient;
import org.apache.htrace.jetty.client.api.ContentResponse;
import org.apache.htrace.jetty.client.api.Request;
import org.apache.htrace.jetty.client.util.StringContentProvider;
import org.apache.htrace.jetty.http.HttpField;
import org.apache.htrace.jetty.http.HttpHeader;
import org.apache.htrace.jetty.http.HttpMethod;

/* loaded from: input_file:org/apache/htrace/impl/HTracedRESTReceiver.class */
public class HTracedRESTReceiver extends SpanReceiver {
    private final HttpClient httpClient;
    private final int capacity;
    private final String url;
    private final int maxToSendAtATime;
    private final PostSpans postSpans;
    private final Thread postSpansThread;
    public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms";
    private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
    public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms";
    private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
    public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
    private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/";
    public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity";
    private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
    public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms";
    private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
    public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY = "htrace.client.rest.batch.size";
    private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
    private final ArrayDeque<Span> spans;
    private boolean mustStartFlush;
    private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
    private static long WARN_TIMEOUT_MS = 300000;
    private ReentrantLock lock = new ReentrantLock();
    private Condition cond = this.lock.newCondition();
    private boolean shutdown = false;
    private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0);

    /* loaded from: input_file:org/apache/htrace/impl/HTracedRESTReceiver$PostSpans.class */
    private class PostSpans implements Runnable {
        private final long periodInNs;
        private final ArrayDeque<Span> spanBuf;

        private PostSpans(long j) {
            this.periodInNs = TimeUnit.NANOSECONDS.convert(j, TimeUnit.MILLISECONDS);
            this.spanBuf = new ArrayDeque<>(HTracedRESTReceiver.this.maxToSendAtATime);
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                long j = this.periodInNs;
                while (true) {
                    HTracedRESTReceiver.this.lock.lock();
                    try {
                        if (!HTracedRESTReceiver.this.shutdown) {
                            try {
                                j = HTracedRESTReceiver.this.cond.awaitNanos(j);
                                if (HTracedRESTReceiver.this.mustStartFlush) {
                                    j = 0;
                                    HTracedRESTReceiver.this.mustStartFlush = false;
                                }
                            } catch (InterruptedException e) {
                                HTracedRESTReceiver.LOG.info("Got InterruptedException");
                                j = 0;
                            }
                        } else if (HTracedRESTReceiver.this.spans.isEmpty()) {
                            break;
                        }
                        if (HTracedRESTReceiver.this.spans.size() > HTracedRESTReceiver.this.maxToSendAtATime || j <= 0 || HTracedRESTReceiver.this.shutdown) {
                            loadSpanBuf();
                            j = this.periodInNs;
                        }
                        HTracedRESTReceiver.this.lock.unlock();
                        if (!this.spanBuf.isEmpty()) {
                            sendSpans();
                            this.spanBuf.clear();
                        }
                    } catch (Throwable th) {
                        HTracedRESTReceiver.this.lock.unlock();
                        throw th;
                    }
                }
                HTracedRESTReceiver.LOG.debug("Shutting down PostSpans thread...");
                HTracedRESTReceiver.this.lock.unlock();
            } finally {
                if (HTracedRESTReceiver.this.httpClient != null) {
                    try {
                        HTracedRESTReceiver.this.httpClient.stop();
                    } catch (Exception e2) {
                        HTracedRESTReceiver.LOG.error("Error shutting down httpClient", e2);
                    }
                }
                HTracedRESTReceiver.this.spans.clear();
            }
        }

        private void loadSpanBuf() {
            Span span;
            for (int i = 0; i < HTracedRESTReceiver.this.maxToSendAtATime && (span = (Span) HTracedRESTReceiver.this.spans.pollFirst()) != null; i++) {
                this.spanBuf.add(span);
            }
        }

        private void sendSpans() {
            try {
                Request method = HTracedRESTReceiver.this.httpClient.newRequest(HTracedRESTReceiver.this.url).method(HttpMethod.POST);
                method.header(HttpHeader.CONTENT_TYPE, "application/json");
                StringBuilder sb = new StringBuilder();
                Iterator<Span> it = this.spanBuf.iterator();
                while (it.hasNext()) {
                    sb.append(it.next().toJson());
                }
                method.content(new StringContentProvider(sb.toString()));
                ContentResponse send = method.send();
                if (send.getStatus() != 200) {
                    HTracedRESTReceiver.LOG.error("Status: " + send.getStatus());
                    HTracedRESTReceiver.LOG.error(send.getHeaders());
                    HTracedRESTReceiver.LOG.error(send.getContentAsString());
                } else if (HTracedRESTReceiver.LOG.isDebugEnabled()) {
                    HTracedRESTReceiver.LOG.debug("POSTED " + this.spanBuf.size() + " spans");
                }
            } catch (InterruptedException e) {
                HTracedRESTReceiver.LOG.error(e);
            } catch (ExecutionException e2) {
                HTracedRESTReceiver.LOG.error(e2);
            } catch (TimeoutException e3) {
                HTracedRESTReceiver.LOG.error(e3);
            }
        }
    }

    HttpClient createHttpClient(long j, long j2) {
        HttpClient httpClient = new HttpClient();
        httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, getClass().getSimpleName()));
        httpClient.setConnectTimeout(j);
        httpClient.setIdleTimeout(j2);
        return httpClient;
    }

    public HTracedRESTReceiver(HTraceConfiguration hTraceConfiguration) throws Exception {
        int i = hTraceConfiguration.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY, 30000);
        int i2 = hTraceConfiguration.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY, CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
        this.httpClient = createHttpClient(i, i2);
        this.capacity = hTraceConfiguration.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
        this.spans = new ArrayDeque<>(this.capacity);
        URL url = new URL(hTraceConfiguration.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT));
        URL url2 = new URL(url.getProtocol(), url.getHost(), url.getPort(), "/writeSpans");
        this.url = url2.toString();
        int i3 = hTraceConfiguration.getInt(CLIENT_REST_PERIOD_MS_KEY, 30000);
        this.maxToSendAtATime = hTraceConfiguration.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, 100);
        this.httpClient.start();
        this.postSpans = new PostSpans(i3);
        this.postSpansThread = new Thread(this.postSpans);
        this.postSpansThread.setDaemon(true);
        this.postSpansThread.setName("PostSpans");
        this.postSpansThread.start();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created new HTracedRESTReceiver with connTimeout=" + i + ", idleTimeout = " + i2 + ", capacity=" + this.capacity + ", url=" + url2 + ", periodInMs=" + i3 + ", maxToSendAtATime=" + this.maxToSendAtATime);
        }
    }

    public void close() throws IOException {
        LOG.debug("Closing HTracedRESTReceiver(" + this.url + ").");
        this.lock.lock();
        try {
            this.shutdown = true;
            this.cond.signal();
            this.lock.unlock();
            try {
                this.postSpansThread.join(120000L);
                if (this.postSpansThread.isAlive()) {
                    LOG.error("Timed out without closing HTracedRESTReceiver(" + this.url + ").");
                } else {
                    LOG.debug("Closed HTracedRESTReceiver(" + this.url + ").");
                }
            } catch (InterruptedException e) {
                LOG.error("Interrupted while joining postSpans", e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    void startFlushing() {
        LOG.info("Triggering HTracedRESTReceiver flush.");
        this.lock.lock();
        try {
            this.mustStartFlush = true;
            this.cond.signal();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void receiveSpan(Span span) {
        boolean z = false;
        this.lock.lock();
        try {
            if (this.shutdown) {
                LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver is already shut down.");
                this.lock.unlock();
                return;
            }
            if (this.spans.size() < this.capacity) {
                this.spans.add(span);
                z = true;
                if (this.spans.size() >= this.maxToSendAtATime) {
                    this.cond.signal();
                }
            } else {
                this.cond.signal();
            }
            if (z) {
                return;
            }
            long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
            long j = this.lastAtCapacityWarningLog.get();
            if (convert - j <= WARN_TIMEOUT_MS || !this.lastAtCapacityWarningLog.compareAndSet(j, convert)) {
                return;
            }
            LOG.warn("There are too many HTrace spans to buffer!  We have already buffered " + this.capacity + " spans.  Dropping spans.");
        } finally {
            this.lock.unlock();
        }
    }
}
