package com.addthis.basis.kafka;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/addthis/basis/kafka/KafkaConsumerService.class */
public class KafkaConsumerService<K, V> implements Closeable {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final MetricRegistry metrics;
    private final ConsumerConfig consumerConfig;
    private final Map<String, Integer> topics;
    private final Decoder<K> keyDecoder;
    private final Decoder<V> valueDecoder;
    private final MessageHandler<K, V> messageHandler;
    private ConsumerConnector consumer;
    private ExecutorService streamExecutorService;

    /* loaded from: input_file:com/addthis/basis/kafka/KafkaConsumerService$Builder.class */
    public static class Builder<K, V> {
        private Map<String, Integer> topics = new HashMap();
        private Map<String, String> overrides = new HashMap();
        private String zookeeper;
        private String groupID;
        private Decoder<K> keyDecoder;
        private Decoder<V> valueDecoder;
        private MessageHandler<K, V> messageHandler;
        private MetricRegistry metrics;

        public Builder<K, V> groupID(String str) {
            this.groupID = str;
            return this;
        }

        public Builder<K, V> metrics(MetricRegistry metricRegistry) {
            this.metrics = metricRegistry;
            return this;
        }

        public Builder<K, V> zookeeper(String str) {
            this.zookeeper = str;
            return this;
        }

        public Builder<K, V> addTopic(String str, int i) {
            this.topics.put(str, Integer.valueOf(i));
            return this;
        }

        public Builder<K, V> topics(Map<String, Integer> map) {
            this.topics = map;
            return this;
        }

        public Builder<K, V> addOverride(String str, String str2) {
            this.overrides.put(str, str2);
            return this;
        }

        public Builder<K, V> overrides(Map<String, String> map) {
            this.overrides = map;
            return this;
        }

        public Builder<K, V> keyDecoder(Decoder<K> decoder) {
            this.keyDecoder = decoder;
            return this;
        }

        public Builder<K, V> valueDecoder(Decoder<V> decoder) {
            this.valueDecoder = decoder;
            return this;
        }

        public Builder<K, V> handler(MessageHandler<K, V> messageHandler) {
            this.messageHandler = messageHandler;
            return this;
        }

        public KafkaConsumerService<K, V> build() {
            Preconditions.checkNotNull(this.groupID, "groupID cannot be null");
            Preconditions.checkNotNull(this.zookeeper, "zookeeper cannot be null");
            Preconditions.checkNotNull(this.messageHandler, "handler cannot be null");
            Preconditions.checkNotNull(this.keyDecoder, "keyDecoder cannot be null");
            Preconditions.checkNotNull(this.valueDecoder, "valueDecoder cannot be null");
            Preconditions.checkArgument(this.topics != null && this.topics.size() > 0, "topics must have at least one topic");
            this.topics.forEach((str, num) -> {
                Preconditions.checkArgument(num.intValue() > 0, String.format("stream count for topic %s must be greater than 0", str));
            });
            if (this.overrides == null) {
                this.overrides = new HashMap();
            }
            this.overrides.put("group.id", this.groupID);
            return new KafkaConsumerService<>(this.metrics, KafkaConsumerService.newConsumerConfig(this.zookeeper, this.overrides), this.topics, this.keyDecoder, this.valueDecoder, this.messageHandler);
        }
    }

    KafkaConsumerService(MetricRegistry metricRegistry, ConsumerConfig consumerConfig, Map<String, Integer> map, Decoder<K> decoder, Decoder<V> decoder2, MessageHandler<K, V> messageHandler) {
        MDC.put("group_id", consumerConfig.groupId());
        this.metrics = metricRegistry;
        this.consumerConfig = consumerConfig;
        this.topics = map;
        this.keyDecoder = decoder;
        this.valueDecoder = decoder2;
        this.messageHandler = messageHandler;
    }

    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.logger.info("starting kafka consumer");
            this.streamExecutorService = new InstrumentedExecutorService(Executors.newFixedThreadPool(this.topics.values().stream().mapToInt(num -> {
                return num.intValue();
            }).sum(), new ThreadFactoryBuilder().setNameFormat("KafkaStreamConsumer-%d").setDaemon(true).build()), this.metrics, MetricRegistry.name(KafkaConsumerService.class, new String[]{"streams"}));
            this.consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
            this.consumer.createMessageStreams(this.topics, this.keyDecoder, this.valueDecoder).forEach((str, list) -> {
                String name = MetricRegistry.name(KafkaConsumerService.class, new String[]{str});
                Timer timer = this.metrics.timer(MetricRegistry.name(name, new String[]{"message_timer"}));
                Meter meter = this.metrics.meter(MetricRegistry.name(name, new String[]{"error_meter"}));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    KafkaStream kafkaStream = (KafkaStream) it.next();
                    this.streamExecutorService.execute(() -> {
                        ConsumerIterator it2 = kafkaStream.iterator();
                        while (it2.hasNext()) {
                            MessageAndMetadata messageAndMetadata = (MessageAndMetadata) it2.next();
                            try {
                                Timer.Context time = timer.time();
                                Throwable th = null;
                                try {
                                    try {
                                        this.messageHandler.handle(messageAndMetadata.topic(), messageAndMetadata.key(), messageAndMetadata.message());
                                        if (time != null) {
                                            if (0 != 0) {
                                                try {
                                                    time.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                time.close();
                                            }
                                        }
                                    } catch (Throwable th3) {
                                        th = th3;
                                        throw th3;
                                        break;
                                    }
                                } finally {
                                }
                            } catch (Exception e) {
                                meter.mark();
                                this.logger.error("unchecked stream handler exception", e);
                            }
                        }
                    });
                }
            });
            this.logger.info("started kafka consumer");
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isRunning.compareAndSet(true, false)) {
            this.logger.info("shutting down kafka consumer");
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
            MoreExecutors.shutdownAndAwaitTermination(this.streamExecutorService, 5L, TimeUnit.SECONDS);
            this.messageHandler.close();
            this.logger.info("shut down kafka consumer");
        }
    }

    public static ConsumerConfig newConsumerConfig(String str, Map<String, String> map) {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", str);
        properties.put("num.consumer.fetchers", "1");
        properties.putAll(map);
        return new ConsumerConfig(properties);
    }

    public static <K, V> Builder<K, V> newBuilder() {
        return new Builder<>();
    }
}
