package com.pinterest.doctorkafka.util;

import com.pinterest.doctorkafka.util.OpenTsdbClient;
import com.twitter.ostrich.stats.Distribution;
import com.twitter.ostrich.stats.Stats$;
import com.twitter.ostrich.stats.StatsListener;
import com.twitter.ostrich.stats.StatsSummary;
import java.net.UnknownHostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:com/pinterest/doctorkafka/util/MetricsPusher.class */
public class MetricsPusher extends Thread {
    private final OpenTsdbMetricConverter converter;
    private final long pollMillis;
    private final String host;
    private final int port;
    private OpenTsdbClient client;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MetricsPusher.class);
    private static final int RETRY_SLEEP_MS = 100;
    private static final int MIN_SOCKET_TIME_MS = 200;
    private final OpenTsdbClient.MetricsBuffer buffer = new OpenTsdbClient.MetricsBuffer();
    private StatsListener statsListener = new StatsListener(Stats$.MODULE$);

    public MetricsPusher(String str, int i, OpenTsdbMetricConverter openTsdbMetricConverter, long j) throws UnknownHostException {
        this.host = str;
        this.port = i;
        this.converter = openTsdbMetricConverter;
        this.pollMillis = j;
        this.client = new OpenTsdbClient(str, i);
        setDaemon(true);
    }

    private void fillMetricsBuffer(StatsSummary statsSummary, int i) {
        this.buffer.reset();
        OpenTsdbClient.MetricsBuffer metricsBuffer = this.buffer;
        Iterator<Tuple2<K, V>> it = statsSummary.counters().iterator();
        while (it.hasNext()) {
            Tuple2 tuple2 = (Tuple2) it.mo4896next();
            this.converter.convertCounter((String) tuple2.mo4875_1(), i, (float) ((Long) tuple2.mo4874_2()).longValue(), metricsBuffer);
        }
        Iterator<Tuple2<K, V>> it2 = statsSummary.gauges().iterator();
        while (it2.hasNext()) {
            Tuple2 tuple22 = (Tuple2) it2.mo4896next();
            this.converter.convertGauge((String) tuple22.mo4875_1(), i, (float) ((Double) tuple22.mo4874_2()).doubleValue(), metricsBuffer);
        }
        Iterator<Tuple2<K, V>> it3 = statsSummary.metrics().iterator();
        while (it3.hasNext()) {
            Tuple2 tuple23 = (Tuple2) it3.mo4896next();
            this.converter.convertMetric((String) tuple23.mo4875_1(), i, (Distribution) tuple23.mo4874_2(), metricsBuffer);
        }
    }

    private void logOstrichStats(int i) {
        LOG.debug("Ostrich Metrics {}: \n{}", Integer.valueOf(i), this.buffer.toString());
    }

    public long sendMetrics(boolean z) throws InterruptedException, UnknownHostException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis + this.pollMillis;
        int i = (int) (currentTimeMillis / 1000);
        fillMetricsBuffer(this.statsListener.get(), i);
        if (LOG.isDebugEnabled()) {
            logOstrichStats(i);
        }
        while (true) {
            try {
                this.client.sendMetrics(this.buffer);
                break;
            } catch (Exception e) {
                LOG.warn("Failed to send stats to OpenTSDB, will retry up to next interval", (Throwable) e);
                if (!z) {
                    break;
                }
                this.client = new OpenTsdbClient(this.host, this.port);
                if (j - System.currentTimeMillis() < 300) {
                    LOG.error("Failed to send epoch {} to OpenTSDB, moving to next interval", Integer.valueOf(i));
                    break;
                }
                Thread.sleep(100L);
            }
        }
        return System.currentTimeMillis() - currentTimeMillis;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.statsListener.get();
            Thread.sleep(this.pollMillis);
            while (!Thread.currentThread().isInterrupted()) {
                Thread.sleep(Math.max(0L, this.pollMillis - sendMetrics(true)));
            }
        } catch (InterruptedException e) {
            LOG.info("OpenTsdbMetricsPusher thread interrupted, exiting");
        } catch (Exception e2) {
            LOG.error("Unexpected error in OpenTSDBMetricsPusher, exiting", (Throwable) e2);
        }
    }
}
