package net.opentsdb.tsd;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import joptsimple.internal.Strings;
import kafka.common.ConsumerRebalanceFailedException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
import net.opentsdb.data.TypedIncomingData;
import net.opentsdb.data.deserializers.Deserializer;
import net.opentsdb.tsd.KafkaRpcPluginGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/opentsdb/tsd/KafkaRpcPluginThread.class */
public class KafkaRpcPluginThread extends Thread {
    static final String CONSUMER_ID = "consumer.id";
    static final String GROUP_ID = "group.id";
    static final String AGG_TAG = "_aggregate";
    static final String DEFAULT_COUNTER_ID = "null";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRpcPluginThread.class);
    private final TSDB tsdb;
    private final ConcurrentMap<String, Map<String, AtomicLong>> namespace_counters;
    private final boolean track_metric_prefix;
    private final int thread_id;
    private final KafkaRpcPluginGroup group;
    private final String consumer_id;
    private final TopicFilter topic_filter;
    private final RateLimiter rate_limiter;
    private final KafkaRpcPluginGroup.TsdbConsumerType consumer_type;
    private final long requeue_delay;
    private final Deserializer deserializer;
    private ConsumerConnector consumer;
    private final int number_consumer_streams = 1;
    private final AtomicBoolean thread_running = new AtomicBoolean();
    private final AtomicLong messagesReceived = new AtomicLong();
    private final AtomicLong datapointsReceived = new AtomicLong();
    private final AtomicLong deserializationErrors = new AtomicLong();
    private final AtomicDouble cumulativeRateDelay = new AtomicDouble();
    private final AtomicDouble kafkaWaitTime = new AtomicDouble();

    /* renamed from: net.opentsdb.tsd.KafkaRpcPluginThread$1, reason: invalid class name */
    /* loaded from: input_file:net/opentsdb/tsd/KafkaRpcPluginThread$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType = new int[KafkaRpcPluginGroup.TsdbConsumerType.values().length];

        static {
            try {
                $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[KafkaRpcPluginGroup.TsdbConsumerType.RAW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[KafkaRpcPluginGroup.TsdbConsumerType.ROLLUP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[KafkaRpcPluginGroup.TsdbConsumerType.REQUEUE_RAW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[KafkaRpcPluginGroup.TsdbConsumerType.REQUEUE_ROLLUP.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[KafkaRpcPluginGroup.TsdbConsumerType.UID_ABUSE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:net/opentsdb/tsd/KafkaRpcPluginThread$CounterType.class */
    public enum CounterType {
        ReadRaw("readRawCounter"),
        ReadRequeuedRaw("readRequeueRawCounter"),
        ReadAggregate("readAggregateCounter"),
        ReadRequeuedAggregate("readRequeueAggregateCounter"),
        ReadRollup("readRollupCounter"),
        ReadRequeuedRollup("readRequeueRollupCounter"),
        ReadHistogram("readHistogramCounter"),
        ReadRequeuedHistogram("readRequeueHistogramCounter"),
        StoredRaw("storedRawCounter"),
        StoredRequeuedRaw("storedRequeueRawCounter"),
        StoredAggregate("storedAggregateCounter"),
        StoredRequeuedAggregate("storedRequeueAggregateCounter"),
        StoredRollup("storedRollupCounter"),
        StoredRequeuedRollup("storedRequeueRollupCounter"),
        StoredHistogram("storedHistogramCounter"),
        StoredRequeuedHistogram("storedRequeueHistogramCounter"),
        UnknownMetric("unknownMetricCounter"),
        RequeuedRaw("requeuedRawCounter"),
        RequeuedAggregate("requeuedAggregateCounter"),
        RequeuedRollup("requeuedRollupCounter"),
        RequeuedHistogram("requeuedHistogramCounter"),
        RequeuesDelayed("requeuesDelayedCounter"),
        Exception("exceptionCounter"),
        DroppedRawRollup("droppedRawRollupCounter"),
        StorageException("storageExceptionCounter"),
        PleaseThrottle("pleaseThrottleExceptionCounter"),
        TimeoutException("timeoutExceptionCounter"),
        IllegalArgument("illegalArgumentCounter"),
        UIDCacheMiss("uidCacheMissCounter"),
        NAN("nanCounter"),
        UnknownRollup("unknownRollupCounter"),
        EmptyMessage("emptyMessageCounter"),
        StatusMessage("statusMessageCounter"),
        UnknownException("unknownExceptionCounter"),
        UIDAbuse("uidAbuseCounter");

        private final String name;

        CounterType(String str) {
            this.name = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.name;
        }
    }

    public KafkaRpcPluginThread(KafkaRpcPluginGroup kafkaRpcPluginGroup, int i, String str) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("Missing topics");
        }
        if (i < 0) {
            throw new IllegalArgumentException("Cannot have a negative thread ID: " + i);
        }
        if (kafkaRpcPluginGroup.getParent().getTSDB() == null) {
            throw new IllegalArgumentException("Missing TSDB in the group");
        }
        if (kafkaRpcPluginGroup.getRateLimiter() == null) {
            throw new IllegalArgumentException("Missing rate limiter in the group");
        }
        if (kafkaRpcPluginGroup.getGroupID() == null || kafkaRpcPluginGroup.getGroupID().isEmpty()) {
            throw new IllegalArgumentException("Missing group ID");
        }
        if (kafkaRpcPluginGroup.getParent().getHost() == null || kafkaRpcPluginGroup.getParent().getHost().isEmpty()) {
            throw new IllegalArgumentException("Missing host name");
        }
        this.namespace_counters = kafkaRpcPluginGroup.getParent().getNamespaceCounters();
        this.track_metric_prefix = kafkaRpcPluginGroup.getParent().trackMetricPrefix();
        this.thread_id = i;
        this.group = kafkaRpcPluginGroup;
        this.tsdb = kafkaRpcPluginGroup.getParent().getTSDB();
        this.rate_limiter = kafkaRpcPluginGroup.getRateLimiter();
        this.consumer_type = kafkaRpcPluginGroup.getConsumerType();
        this.thread_running.set(false);
        this.topic_filter = new Whitelist(str);
        this.consumer_id = i + "_" + kafkaRpcPluginGroup.getParent().getHost();
        if (this.consumer_type != KafkaRpcPluginGroup.TsdbConsumerType.REQUEUE_RAW) {
            this.requeue_delay = 0L;
        } else if (kafkaRpcPluginGroup.getParent().getConfig().hasProperty("KafkaRpcPlugin.requeueDelay")) {
            this.requeue_delay = kafkaRpcPluginGroup.getParent().getConfig().getLong("KafkaRpcPlugin.requeueDelay");
        } else {
            this.requeue_delay = KafkaRpcPluginConfig.DEFAULT_REQUEUE_DELAY_MS;
        }
        this.deserializer = kafkaRpcPluginGroup.getDeserializer();
    }

    @Override // java.lang.Thread
    public String toString() {
        return this.group.getGroupID() + "_" + this.consumer_id;
    }

    ConsumerConnector buildConsumerConnector() {
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(buildConsumerProperties()));
    }

    Properties buildConsumerProperties() {
        Properties properties = new Properties();
        for (Map.Entry entry : this.group.getParent().getConfig().getMap().entrySet()) {
            String str = (String) entry.getKey();
            if (str.startsWith(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX) && !str.contains(this.group.getGroupID())) {
                properties.put(str.substring(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX.length()), entry.getValue());
            }
        }
        String str2 = KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + this.group.getGroupID() + ".";
        for (Map.Entry entry2 : this.group.getParent().getConfig().getMap().entrySet()) {
            String str3 = (String) entry2.getKey();
            if (str3.startsWith(str2)) {
                properties.put(str3.substring(str2.length()), entry2.getValue());
            }
        }
        properties.put(GROUP_ID, this.group.getGroupID());
        properties.put(CONSUMER_ID, this.consumer_id);
        LOG.info("Initializing consumer config with consumer id " + this.consumer_id + " and props " + properties);
        return properties;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x009d. Please report as an issue. */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        MessageAndMetadata next;
        long currentTimeMillis;
        this.thread_running.set(true);
        Thread.currentThread().setName(this.group.getGroupID() + "_" + this.consumer_id);
        try {
            this.consumer = buildConsumerConnector();
            ConsumerIterator it = ((KafkaStream) this.consumer.createMessageStreamsByFilter(this.topic_filter, 1).get(0)).iterator();
            int i = 0;
            long nanoTime = System.nanoTime();
            while (it.hasNext()) {
                try {
                    next = it.next();
                    long nanoTime2 = System.nanoTime() - nanoTime;
                    if (nanoTime > 0) {
                        this.kafkaWaitTime.addAndGet(nanoTime2 / 1000000.0d);
                    }
                    this.messagesReceived.incrementAndGet();
                    currentTimeMillis = System.currentTimeMillis();
                } catch (RuntimeException e) {
                    LOG.error("Exception in kafkaReader or Tsdb Writer ", e);
                    incrementNamespaceCounter(CounterType.Exception, (String) null);
                    i++;
                    if (i >= 5) {
                        LOG.error("Too many errors, Killing the consumer thread " + this);
                        throw e;
                    }
                }
                switch (AnonymousClass1.$SwitchMap$net$opentsdb$tsd$KafkaRpcPluginGroup$TsdbConsumerType[this.consumer_type.ordinal()]) {
                    case 1:
                    case KafkaRpcPluginConfig.DEFAULT_CONSUMER_THREADS /* 2 */:
                        List<TypedIncomingData> deserialize = this.deserializer.deserialize(this, (byte[]) next.message());
                        if (deserialize == null) {
                            this.deserializationErrors.incrementAndGet();
                        } else {
                            this.cumulativeRateDelay.addAndGet(this.rate_limiter.acquire());
                            this.datapointsReceived.addAndGet(deserialize.size());
                            Iterator<TypedIncomingData> it2 = deserialize.iterator();
                            while (it2.hasNext()) {
                                it2.next().processData(this, currentTimeMillis);
                            }
                            i = 0;
                            nanoTime = System.nanoTime();
                        }
                    case 3:
                    case 4:
                    case 5:
                        List<TypedIncomingData> deserialize2 = this.deserializer.deserialize(this, (byte[]) next.message());
                        if (deserialize2 == null) {
                            this.deserializationErrors.incrementAndGet();
                        } else {
                            this.datapointsReceived.addAndGet(deserialize2.size());
                            for (TypedIncomingData typedIncomingData : deserialize2) {
                                long currentTimeMillis2 = System.currentTimeMillis() - typedIncomingData.getRequeueTS();
                                if (currentTimeMillis2 < this.requeue_delay) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Sleeping for " + (this.requeue_delay - currentTimeMillis2) + " ms due to requeue delay");
                                    }
                                    Thread.sleep(this.requeue_delay - currentTimeMillis2);
                                }
                                this.cumulativeRateDelay.addAndGet(this.rate_limiter.acquire());
                                typedIncomingData.processData(this, currentTimeMillis);
                            }
                            i = 0;
                            nanoTime = System.nanoTime();
                        }
                    default:
                        throw new IllegalStateException("Unknown consumer type: " + this.consumer_type + " for " + this);
                        break;
                }
            }
            LOG.warn("Consumer thread [" + this + "] has run out of messages");
        } catch (ConsumerRebalanceFailedException e2) {
            LOG.error("Failed to read Kafka partition from the consumer thread " + this, e2);
            this.group.incrementRebalanceFailures();
        } catch (Exception e3) {
            LOG.error("Unexpected exception in Kafka thread: " + this, e3);
        } catch (Throwable th) {
            LOG.error("Fatal exception in Kafka thread: " + this, th);
            System.exit(1);
        }
        shutdown();
        if (this.group.getRate() > 0.0d) {
            LOG.error("Thread [" + this + "] has exited the consumer loop!");
        } else {
            LOG.info("Thread [" + this + "] has stopped consuming as the rate limit is zero");
        }
        this.thread_running.set(false);
    }

    public void shutdown() {
        LOG.info("Shutting down thread [" + this + "]");
        try {
            if (this.consumer != null) {
                this.consumer.shutdown();
                this.consumer = null;
                LOG.info("Shutdown the kafka consumer on thread [" + this + "]");
            }
        } catch (Exception e) {
            LOG.error("Failed to shutdown the kafka consumer on thread [" + this + "]");
            if (this.group.getParent().getConfig().getBoolean("KafkaRpcPlugin.killOnShutdownError")) {
                LOG.error("Shutting down the entire process because of thread [" + this + "]");
                System.exit(1);
            }
        } catch (Throwable th) {
            LOG.error("Fatal exception in Kafka thread: " + this, th);
            System.exit(1);
        }
    }

    public void incrementNamespaceCounter(CounterType counterType, String str) {
        incrementNamespaceCounter(counterType.toString(), str);
    }

    public void incrementNamespaceCounter(String str, String str2) {
        Map<String, AtomicLong> map = this.namespace_counters.get(str);
        if (map == null) {
            map = new ConcurrentHashMap();
            Map<String, AtomicLong> putIfAbsent = this.namespace_counters.putIfAbsent(str, map);
            if (putIfAbsent != null) {
                map = putIfAbsent;
            }
        }
        String prefix = this.track_metric_prefix ? getPrefix(str2) : DEFAULT_COUNTER_ID;
        AtomicLong atomicLong = map.get(prefix);
        if (atomicLong == null) {
            atomicLong = new AtomicLong();
            AtomicLong put = map.put(prefix, atomicLong);
            if (put != null) {
                atomicLong = put;
            }
        }
        atomicLong.incrementAndGet();
    }

    public Map<String, Map<String, AtomicLong>> getNamespaceCounters() {
        return this.namespace_counters;
    }

    public long getMessagesReceived() {
        return this.messagesReceived.get();
    }

    public long getDatapointsReceived() {
        return this.datapointsReceived.get();
    }

    public long getDeserializationErrors() {
        return this.deserializationErrors.get();
    }

    public double getCumulativeRateDelay() {
        return this.cumulativeRateDelay.get();
    }

    public double getKafkaWaitTime() {
        return this.kafkaWaitTime.get();
    }

    public boolean isThreadRunning() {
        return this.thread_running.get();
    }

    public TSDB getTSDB() {
        return this.tsdb;
    }

    @VisibleForTesting
    ConsumerConnector consumer() {
        return this.consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int threadID() {
        return this.thread_id;
    }

    @VisibleForTesting
    long requeueDelay() {
        return this.requeue_delay;
    }

    String getDebugString(IncomingDataPoint incomingDataPoint) {
        StringBuilder append = new StringBuilder().append(incomingDataPoint.getMetric()).append(" ");
        for (Map.Entry entry : incomingDataPoint.getTags().entrySet()) {
            append.append((String) entry.getKey()).append("=").append((String) entry.getValue()).append(" ");
        }
        return append.toString();
    }

    String getPrefix(String str) {
        int indexOf;
        return (!Strings.isNullOrEmpty(str) && (indexOf = str.indexOf(".")) >= 1) ? str.substring(0, indexOf) : DEFAULT_COUNTER_ID;
    }
}
