package gobblin.runtime.kafka;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import gobblin.configuration.State;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.MetricContext;
import gobblin.metrics.Tag;
import gobblin.runtime.metrics.RuntimeMetrics;
import gobblin.util.ConfigUtils;
import gobblin.util.ExecutorsUtils;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/kafka/HighLevelConsumer.class */
public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HighLevelConsumer.class);
    public static final String GROUP_ID_KEY = "group.id";
    private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
    private final String topic;
    private final int numThreads;
    private final Config config;
    private final ConsumerConfig consumerConfig;
    private MetricContext metricContext;
    private Counter messagesRead;
    private ConsumerConnector consumer;
    private ExecutorService executor;

    /* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/kafka/HighLevelConsumer$MonitorConsumer.class */
    public class MonitorConsumer implements Runnable {
        private final KafkaStream stream;

        @Override // java.lang.Runnable
        public void run() {
            ConsumerIterator<K, V> it = this.stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<K, V> next = it.next();
                HighLevelConsumer.this.messagesRead.inc();
                HighLevelConsumer.this.processMessage(next);
            }
        }

        @ConstructorProperties({"stream"})
        public MonitorConsumer(KafkaStream kafkaStream) {
            this.stream = kafkaStream;
        }
    }

    public HighLevelConsumer(String str, Config config, int i) {
        this.topic = str;
        this.numThreads = i;
        this.config = config;
        this.consumerConfig = createConsumerConfig(config);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void buildMetricsContextAndMetrics() {
        this.metricContext = Instrumented.getMetricContext(new State(ConfigUtils.configToProperties(this.config)), getClass(), getTagsForMetrics());
        createMetrics();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void shutdownMetrics() throws IOException {
        if (this.metricContext != null) {
            this.metricContext.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createMetrics() {
        this.messagesRead = this.metricContext.counter(RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Tag<?>> getTagsForMetrics() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new Tag("topic", this.topic));
        newArrayList.add(new Tag(RuntimeMetrics.GROUP_ID, this.consumerConfig.groupId()));
        return newArrayList;
    }

    protected abstract void processMessage(MessageAndMetadata<K, V> messageAndMetadata);

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() {
        buildMetricsContextAndMetrics();
        this.consumer = createConsumerConnector();
        List<KafkaStream<byte[], byte[]>> createStreams = createStreams();
        this.executor = Executors.newFixedThreadPool(this.numThreads);
        int i = 0;
        Iterator<KafkaStream<byte[], byte[]>> it = createStreams.iterator();
        while (it.hasNext()) {
            this.executor.execute(new MonitorConsumer(it.next()));
            i++;
        }
    }

    protected ConsumerConfig createConsumerConfig(Config config) {
        Properties configToProperties = ConfigUtils.configToProperties(config);
        if (!configToProperties.containsKey("group.id")) {
            configToProperties.setProperty("group.id", DEFAULT_GROUP_ID);
        }
        return new ConsumerConfig(configToProperties);
    }

    protected ConsumerConnector createConsumerConnector() {
        return Consumer.createJavaConsumerConnector(this.consumerConfig);
    }

    protected List<KafkaStream<byte[], byte[]>> createStreams() {
        Map<String, Integer> newHashMap = Maps.newHashMap();
        newHashMap.put(this.topic, Integer.valueOf(this.numThreads));
        return this.consumer.createMessageStreams(newHashMap).get(this.topic);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    public void shutDown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log), 5000L, TimeUnit.MILLISECONDS);
        }
        try {
            shutdownMetrics();
        } catch (IOException e) {
            log.warn("Failed to shutdown metrics for " + getClass().getSimpleName());
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public MetricContext getMetricContext() {
        return this.metricContext;
    }
}
