package org.apache.rya.streams.querymanager.kafka;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.querymanager.QueryChangeLogSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.class */
public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class);
    private final AbstractScheduledService.Scheduler scheduler;
    private final String kafkaBootstrapServer;
    private KafkaConsumer<String, String> listTopicsConsumer;
    private final ReentrantLock lock = new ReentrantLock();
    private final Set<QueryChangeLogSource.SourceListener> listeners = new HashSet();
    private final HashMap<String, String> knownChangeLogs = new HashMap<>();

    public KafkaQueryChangeLogSource(String str, int i, AbstractScheduledService.Scheduler scheduler) {
        this.kafkaBootstrapServer = ((String) Objects.requireNonNull(str)) + ":" + i;
        this.scheduler = (AbstractScheduledService.Scheduler) Objects.requireNonNull(scheduler);
    }

    protected void startUp() throws Exception {
        log.info("Kafka Query Change Log Source watching " + this.kafkaBootstrapServer + " starting up...");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.kafkaBootstrapServer);
        properties.setProperty("group.id", UUID.randomUUID().toString());
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        this.listTopicsConsumer = new KafkaConsumer<>(properties);
        log.info("Kafka Query Change Log Source watching " + this.kafkaBootstrapServer + " started.");
    }

    protected void shutDown() throws Exception {
        log.info("Kafka Query Change Log Source watching " + this.kafkaBootstrapServer + " shutting down...");
        this.lock.lock();
        try {
            this.listTopicsConsumer.close();
            log.info("Kafka Query Change Log Source watching " + this.kafkaBootstrapServer + " shut down.");
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.rya.streams.querymanager.QueryChangeLogSource
    public void subscribe(QueryChangeLogSource.SourceListener sourceListener) {
        Objects.requireNonNull(sourceListener);
        this.lock.lock();
        try {
            this.listeners.add(sourceListener);
            for (Map.Entry<String, String> entry : this.knownChangeLogs.entrySet()) {
                sourceListener.notifyCreate(entry.getKey(), KafkaQueryChangeLogFactory.make(this.kafkaBootstrapServer, entry.getValue()));
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.rya.streams.querymanager.QueryChangeLogSource
    public void unsubscribe(QueryChangeLogSource.SourceListener sourceListener) {
        Objects.requireNonNull(sourceListener);
        this.lock.lock();
        try {
            this.listeners.remove(sourceListener);
        } finally {
            this.lock.unlock();
        }
    }

    protected void runOneIteration() throws Exception {
        this.lock.lock();
        try {
            HashSet hashSet = new HashSet(this.listTopicsConsumer.listTopics().keySet());
            hashSet.removeIf(str -> {
                return !KafkaTopics.getRyaInstanceFromQueryChangeLog(str).isPresent();
            });
            Set set = (Set) hashSet.stream().map(str2 -> {
                return (String) KafkaTopics.getRyaInstanceFromQueryChangeLog(str2).get();
            }).collect(Collectors.toSet());
            HashSet<String> hashSet2 = new HashSet((Collection) Sets.difference(this.knownChangeLogs.keySet(), set));
            HashSet<String> hashSet3 = new HashSet((Collection) Sets.difference(set, this.knownChangeLogs.keySet()));
            for (String str3 : hashSet2) {
                this.knownChangeLogs.remove(str3);
                Iterator<QueryChangeLogSource.SourceListener> it = this.listeners.iterator();
                while (it.hasNext()) {
                    it.next().notifyDelete(str3);
                }
            }
            for (String str4 : hashSet3) {
                String queryChangeLogTopic = KafkaTopics.queryChangeLogTopic(str4);
                this.knownChangeLogs.put(str4, queryChangeLogTopic);
                Iterator<QueryChangeLogSource.SourceListener> it2 = this.listeners.iterator();
                while (it2.hasNext()) {
                    it2.next().notifyCreate(str4, KafkaQueryChangeLogFactory.make(this.kafkaBootstrapServer, queryChangeLogTopic));
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return this.scheduler;
    }
}
