package co.cask.cdap.logging.save;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.proto.Id;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.name.Named;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Cancellable;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/save/LogSaver.class */
public final class LogSaver extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(LogSaver.class);
    private static final int TIMEOUT_SECONDS = 10;
    private final String topic;
    private final KafkaClientService kafkaClient;
    private final Set<Integer> partitions;
    private final Map<Integer, Cancellable> kafkaCancelMap;
    private final Map<Integer, CountDownLatch> kafkaCancelCallbackLatchMap;
    private final Set<KafkaLogProcessorFactory> messageProcessorFactories;
    private final SetMultimap<Integer, KafkaLogProcessor> partitionProcessorsMap;
    private final MetricsContext metricsContext;

    @Inject
    LogSaver(KafkaClientService kafkaClientService, CConfiguration cConfiguration, @Named("log.saver.message.processor.factories") Set<KafkaLogProcessorFactory> set, @Assisted Set<Integer> set2, MetricsCollectionService metricsCollectionService) throws Exception {
        LOG.info("Initializing LogSaver...");
        this.topic = cConfiguration.get("log.kafka.topic");
        this.partitions = set2;
        LOG.info(String.format("Kafka topic: %s, partitions: %s", this.topic, this.partitions));
        this.kafkaClient = kafkaClientService;
        this.kafkaCancelMap = new HashMap();
        this.kafkaCancelCallbackLatchMap = new HashMap();
        this.messageProcessorFactories = set;
        this.partitionProcessorsMap = HashMultimap.create();
        this.metricsContext = metricsCollectionService.getContext(ImmutableMap.of("ns", Id.Namespace.SYSTEM.getId(), "cmp", "log.saver"));
    }

    protected void startUp() throws Exception {
        LOG.info("Starting LogSaver...");
        createProcessors(this.partitions);
        waitForDatasetAvailability();
        scheduleTasks(this.partitions);
        LOG.info("Started LogSaver.");
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping LogSaver...");
        unscheduleTasks();
        LOG.info("Stopped LogSaver.");
    }

    @VisibleForTesting
    void scheduleTasks(Set<Integer> set) throws Exception {
        subscribe(set);
    }

    @VisibleForTesting
    void unscheduleTasks() {
        Set<Cancellable> stopKafkaMessageCallbacks = stopKafkaMessageCallbacks();
        for (KafkaLogProcessor kafkaLogProcessor : this.partitionProcessorsMap.values()) {
            try {
                kafkaLogProcessor.stop();
            } catch (Throwable th) {
                LOG.error("Error stopping processor {}", kafkaLogProcessor.getClass().getSimpleName());
            }
        }
        Iterator<Cancellable> it = stopKafkaMessageCallbacks.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.partitionProcessorsMap.clear();
    }

    private void createProcessors(Set<Integer> set) throws Exception {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            HashSet hashSet = new HashSet();
            Iterator<KafkaLogProcessorFactory> it2 = this.messageProcessorFactories.iterator();
            while (it2.hasNext()) {
                hashSet.add(it2.next().create());
            }
            this.partitionProcessorsMap.putAll(Integer.valueOf(intValue), hashSet);
        }
    }

    private Set<Cancellable> stopKafkaMessageCallbacks() {
        HashSet hashSet = new HashSet();
        for (Map.Entry<Integer, Cancellable> entry : this.kafkaCancelMap.entrySet()) {
            if (entry.getValue() != null) {
                LOG.info("Cancelling kafka callback for partition {}", entry.getKey());
                this.kafkaCancelCallbackLatchMap.get(entry.getKey()).countDown();
                hashSet.add(entry.getValue());
            }
        }
        this.kafkaCancelMap.clear();
        this.kafkaCancelCallbackLatchMap.clear();
        return hashSet;
    }

    private void subscribe(Set<Integer> set) throws Exception {
        LOG.info("Prepare to subscribe for partitions: {}", set);
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Set set2 = this.partitionProcessorsMap.get(Integer.valueOf(intValue));
            Iterator it2 = set2.iterator();
            while (it2.hasNext()) {
                ((KafkaLogProcessor) it2.next()).init(intValue);
            }
            KafkaConsumer.Preparer prepare = this.kafkaClient.getConsumer().prepare();
            long lowestCheckpointOffset = getLowestCheckpointOffset(intValue);
            newHashMap.put(Integer.valueOf(intValue), Long.valueOf(lowestCheckpointOffset));
            if (lowestCheckpointOffset >= 0) {
                prepare.add(this.topic, intValue, lowestCheckpointOffset);
            } else {
                prepare.addFromBeginning(this.topic, intValue);
            }
            this.kafkaCancelCallbackLatchMap.put(Integer.valueOf(intValue), new CountDownLatch(1));
            this.kafkaCancelMap.put(Integer.valueOf(intValue), prepare.consume(new KafkaMessageCallback(intValue, this.kafkaCancelCallbackLatchMap.get(Integer.valueOf(intValue)), set2, this.metricsContext)));
        }
        LOG.info("Consumer created for topic {}, partitions {}", this.topic, newHashMap);
    }

    private long getLowestCheckpointOffset(int i) {
        long j = -1;
        for (KafkaLogProcessor kafkaLogProcessor : this.partitionProcessorsMap.get(Integer.valueOf(i))) {
            Checkpoint checkpoint = kafkaLogProcessor.getCheckpoint();
            LOG.trace("Got checkpoint {} for partition {} and processor {}", new Object[]{checkpoint, Integer.valueOf(i), kafkaLogProcessor.getClass().getName()});
            if (checkpoint.getNextOffset() != -1) {
                j = (j == -1 || checkpoint.getNextOffset() < j) ? checkpoint.getNextOffset() : j;
            }
        }
        LOG.debug("Lowest checkpoint for partition {} is {}", Integer.valueOf(i), Long.valueOf(j));
        return j;
    }

    private void waitForDatasetAvailability() throws InterruptedException {
        boolean z = false;
        while (!z) {
            try {
                Iterator it = this.partitionProcessorsMap.values().iterator();
                while (it.hasNext()) {
                    ((KafkaLogProcessor) it.next()).getCheckpoint();
                }
                z = true;
            } catch (Exception e) {
                LOG.warn(String.format("Cannot discover dataset service. Retry after %d seconds timeout.", Integer.valueOf(TIMEOUT_SECONDS)));
                TimeUnit.SECONDS.sleep(10L);
            }
        }
    }
}
