package co.cask.cdap.metrics.process;

import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.metrics.store.MetricDatasetFactory;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.name.Named;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/process/KafkaMetricsProcessorService.class */
public final class KafkaMetricsProcessorService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsProcessorService.class);
    private final KafkaClientService kafkaClient;
    private final MessageCallbackFactory callbackFactory;
    private final String topicPrefix;
    private final Set<Integer> partitions;
    private Cancellable unsubscribe;
    private final MetricDatasetFactory metricDatasetFactory;

    @Nullable
    private MetricsContext metricsContext;
    private volatile boolean stopping = false;
    private KafkaConsumerMetaTable metaTable;

    @Inject
    public KafkaMetricsProcessorService(KafkaClientService kafkaClientService, MetricDatasetFactory metricDatasetFactory, MessageCallbackFactory messageCallbackFactory, @Named("metrics.kafka.topic.prefix") String str, @Assisted Set<Integer> set) {
        this.kafkaClient = kafkaClientService;
        this.callbackFactory = messageCallbackFactory;
        this.topicPrefix = str;
        this.partitions = set;
        this.metricDatasetFactory = metricDatasetFactory;
    }

    public void setMetricsContext(MetricsContext metricsContext) {
        this.metricsContext = metricsContext;
    }

    protected String getServiceName() {
        return getClass().getSimpleName();
    }

    protected void run() {
        LOG.info("Starting Metrics Processing for partitions {}.", this.partitions);
        if (subscribe()) {
            LOG.info("Metrics Processing Service started for partitions {}.", this.partitions);
            while (isRunning()) {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    protected void triggerShutdown() {
        LOG.info("Shutdown is triggered.");
        this.stopping = true;
        super.triggerShutdown();
    }

    protected void shutDown() {
        LOG.info("Stopping Metrics Processing Service.");
        if (this.unsubscribe != null) {
            this.unsubscribe.cancel();
        }
        LOG.info("Metrics Processing Service stopped.");
    }

    private KafkaConsumerMetaTable getMetaTable() {
        while (true) {
            if (this.metaTable != null) {
                break;
            }
            if (this.stopping) {
                LOG.info("We are shutting down, giving up on acquiring KafkaConsumerMetaTable.");
                break;
            }
            try {
                this.metaTable = this.metricDatasetFactory.createKafkaConsumerMeta();
            } catch (Exception e) {
                LOG.warn("Cannot access kafka consumer metaTable, will retry in 1 sec.");
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        return this.metaTable;
    }

    private boolean subscribe() {
        KafkaConsumer.Preparer prepare = this.kafkaClient.getConsumer().prepare();
        String str = this.topicPrefix;
        Iterator<Integer> it = this.partitions.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            try {
                LOG.info("Retrieve offset for topic: {}, partition: {}", str, Integer.valueOf(intValue));
                KafkaConsumerMetaTable metaTable = getMetaTable();
                if (metaTable == null) {
                    LOG.info("Could not get KafkaConsumerMetaTable, seems like we are being shut down");
                    return false;
                }
                long j = metaTable.get(new TopicPartition(str, intValue));
                LOG.info("Offset for topic: {}, partition: {} is {}", new Object[]{str, Integer.valueOf(intValue), Long.valueOf(j)});
                if (j >= 0) {
                    prepare.add(str, intValue, j);
                } else {
                    prepare.addFromBeginning(str, intValue);
                }
            } catch (Exception e) {
                LOG.info("Failed to get KafkaConsumerMetaTable, shutting down");
                return false;
            }
        }
        this.unsubscribe = prepare.consume(this.callbackFactory.create(getMetaTable(), this.metricsContext));
        LOG.info("Consumer created for topic {}, partitions {}", str, this.partitions);
        return true;
    }
}
