package net.opentsdb.tsd;

import com.google.common.util.concurrent.RateLimiter;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import joptsimple.internal.Strings;
import net.opentsdb.data.deserializers.Deserializer;
import net.opentsdb.utils.PluginLoader;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/opentsdb/tsd/KafkaRpcPluginGroup.class */
public class KafkaRpcPluginGroup implements TimerTask {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRpcPluginGroup.class);
    private final KafkaRpcPluginConfig config;
    private final String group_id;
    private final RateLimiter rate_limiter;
    private final List<KafkaRpcPluginThread> kafka_consumers;
    private final int num_threads;
    private final String topics;
    private final HashedWheelTimer timer;
    private final ExecutorService pool;
    private final TsdbConsumerType consumer_type;
    private final KafkaRpcPlugin parent;
    private final AtomicLong restarts = new AtomicLong();
    private final AtomicLong rebalance_failures = new AtomicLong();
    private Deserializer deserializer;
    private double current_rate;

    /* loaded from: input_file:net/opentsdb/tsd/KafkaRpcPluginGroup$TsdbConsumerType.class */
    public enum TsdbConsumerType {
        RAW("Raw"),
        REQUEUE_RAW("RequeueRaw"),
        ROLLUP("Rollup"),
        REQUEUE_ROLLUP("RequeueRollup"),
        UID_ABUSE("UIDAbuse");

        final String name;

        TsdbConsumerType(String str) {
            this.name = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.name;
        }
    }

    public KafkaRpcPluginGroup(KafkaRpcPlugin kafkaRpcPlugin, String str) {
        this.deserializer = null;
        this.parent = kafkaRpcPlugin;
        this.group_id = str;
        this.config = kafkaRpcPlugin.getConfig();
        this.timer = kafkaRpcPlugin.getTSDB().getTimer();
        this.pool = kafkaRpcPlugin.getKafkaPool();
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("Missing group name");
        }
        this.topics = this.config.getString(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".topics");
        if (this.topics == null || this.topics.isEmpty()) {
            throw new IllegalArgumentException("Empty topic filter for group " + str);
        }
        String string = this.config.getString(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".consumerType");
        if (string == null || string.isEmpty()) {
            throw new IllegalArgumentException("Missing consumer type for group " + str);
        }
        this.consumer_type = TsdbConsumerType.valueOf(string.toUpperCase());
        if (this.config.hasProperty(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".rate")) {
            this.current_rate = this.config.getInt(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".rate");
            if (this.current_rate > 0.0d) {
                this.rate_limiter = RateLimiter.create(this.current_rate);
            } else {
                this.rate_limiter = RateLimiter.create(70000.0d);
            }
        } else {
            this.current_rate = 70000.0d;
            this.rate_limiter = RateLimiter.create(70000.0d);
        }
        this.num_threads = this.config.hasProperty(new StringBuilder().append(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE).append(str).append(".threads").toString()) ? this.config.getInt(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".threads") : 2;
        this.kafka_consumers = new ArrayList(this.num_threads);
        String string2 = this.config.getString(KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + str + ".deserializer");
        if (Strings.isNullOrEmpty(string2)) {
            throw new IllegalArgumentException("Deserializer class cannot be null or empty.");
        }
        try {
            Class<?> cls = Class.forName(string2);
            if (cls != null) {
                this.deserializer = (Deserializer) cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
        } catch (ClassNotFoundException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No class [" + string2 + "] found on the class path, trying the plugin loader.");
            }
        } catch (IllegalAccessException e2) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e2);
        } catch (IllegalArgumentException e3) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e3);
        } catch (InstantiationException e4) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e4);
        } catch (NoSuchMethodException e5) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e5);
        } catch (SecurityException e6) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e6);
        } catch (InvocationTargetException e7) {
            LOG.warn("Found instance of [" + string2 + "] but failed to instantiate it:", e7);
        }
        if (this.deserializer == null) {
            this.deserializer = (Deserializer) PluginLoader.loadSpecificPlugin(string2, Deserializer.class);
        }
        if (this.deserializer == null) {
            throw new IllegalArgumentException("Unable to find a deserializer for class [" + string2 + "]");
        }
        for (int i = 0; i < this.num_threads; i++) {
            this.kafka_consumers.add(new KafkaRpcPluginThread(this, i, this.topics));
        }
        this.timer.newTimeout(this, this.config.threadCheckInterval(), TimeUnit.MILLISECONDS);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("groupID=").append(this.group_id).append(", type=").append(this.consumer_type).append(", currentRate=").append(this.current_rate).append(", numThreads=").append(this.num_threads).append(", topics=").append(this.topics);
        return sb.toString();
    }

    public void start() {
        if (this.current_rate > 0.0d) {
            Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
            while (it.hasNext()) {
                this.pool.execute(it.next());
            }
        }
    }

    public void shutdown() {
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public String getGroupID() {
        return this.group_id;
    }

    public RateLimiter getRateLimiter() {
        return this.rate_limiter;
    }

    public KafkaRpcPlugin getParent() {
        return this.parent;
    }

    public TsdbConsumerType getConsumerType() {
        return this.consumer_type;
    }

    public void incrementRebalanceFailures() {
        this.rebalance_failures.incrementAndGet();
    }

    public long getRestarts() {
        return this.restarts.get();
    }

    public long getRebalanceFailures() {
        return this.rebalance_failures.get();
    }

    public long getMessagesReceived() {
        long j = 0;
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            j += it.next().getMessagesReceived();
        }
        return j;
    }

    public long getDatapointsReceived() {
        long j = 0;
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            j += it.next().getDatapointsReceived();
        }
        return j;
    }

    public double getCumulativeRateDelay() {
        double d = 0.0d;
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            d += it.next().getCumulativeRateDelay();
        }
        return d;
    }

    public double getKafkaWaitTime() {
        double d = 0.0d;
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            d += it.next().getKafkaWaitTime();
        }
        return d;
    }

    public long getDeserializationErrors() {
        long j = 0;
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            j += it.next().getDeserializationErrors();
        }
        return j;
    }

    public void getNamespaceCounters(Map<String, Map<String, Long>> map) {
        Iterator<KafkaRpcPluginThread> it = this.kafka_consumers.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, Map<String, AtomicLong>> entry : it.next().getNamespaceCounters().entrySet()) {
                Map<String, Long> map2 = map.get(entry.getKey());
                if (map2 == null) {
                    map2 = new HashMap<>(1);
                    map.put(entry.getKey(), map2);
                }
                for (Map.Entry<String, AtomicLong> entry2 : entry.getValue().entrySet()) {
                    Long l = map2.get(entry2.getKey());
                    if (l == null) {
                        map2.put(entry2.getKey(), Long.valueOf(entry2.getValue().get()));
                    } else {
                        map2.put(entry2.getKey(), Long.valueOf(l.longValue() + entry2.getValue().get()));
                    }
                }
            }
        }
    }

    public Map<Integer, Map<String, Double>> getPerThreadMetrics() {
        HashMap hashMap = new HashMap();
        for (KafkaRpcPluginThread kafkaRpcPluginThread : this.kafka_consumers) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("messagesReceived", Double.valueOf(kafkaRpcPluginThread.getMessagesReceived()));
            hashMap2.put("datapointsReceived", Double.valueOf(kafkaRpcPluginThread.getDatapointsReceived()));
            hashMap2.put("cumulativeRateDelay", Double.valueOf(kafkaRpcPluginThread.getCumulativeRateDelay()));
            hashMap2.put("kafkaWaitTime", Double.valueOf(kafkaRpcPluginThread.getKafkaWaitTime()));
            hashMap.put(Integer.valueOf(kafkaRpcPluginThread.threadID()), hashMap2);
        }
        return hashMap;
    }

    public double getRate() {
        if (this.current_rate == 0.0d) {
            return 0.0d;
        }
        return this.rate_limiter.getRate();
    }

    public void setRate(double d) {
        if (d < 0.0d) {
            throw new IllegalArgumentException("Rate cannot be less than zero " + d);
        }
        this.current_rate = d;
        if (d != 0.0d) {
            this.rate_limiter.setRate(d);
            return;
        }
        LOG.info("The rate has been set to zero for " + this + ". Killing threads.");
        for (KafkaRpcPluginThread kafkaRpcPluginThread : this.kafka_consumers) {
            try {
                kafkaRpcPluginThread.shutdown();
            } catch (Exception e) {
                LOG.error("Exception shutting down thread " + kafkaRpcPluginThread, e);
            }
        }
    }

    public Deserializer getDeserializer() {
        return this.deserializer;
    }

    public void run(Timeout timeout) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Running thread monitor on " + this);
        }
        try {
            if (this.current_rate > 0.0d) {
                for (KafkaRpcPluginThread kafkaRpcPluginThread : this.kafka_consumers) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Writer [" + kafkaRpcPluginThread + "] thread state: " + kafkaRpcPluginThread.isThreadRunning());
                    }
                    if (!kafkaRpcPluginThread.isThreadRunning()) {
                        LOG.warn("Writer [" + kafkaRpcPluginThread + "] was terminated, restarting");
                        this.pool.execute(kafkaRpcPluginThread);
                        this.restarts.incrementAndGet();
                    }
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Consumer group " + this + " has a rate limit of 0, not running");
            }
        } catch (Exception e) {
            LOG.error("Failure while monitoring threads for group: " + this, e);
        } catch (Throwable th) {
            LOG.error("Fatal exception in group thread: " + this, th);
            System.exit(1);
        }
        this.timer.newTimeout(this, this.config.threadCheckInterval(), TimeUnit.MILLISECONDS);
    }
}
