package org.apache.rya.streams.querymanager.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.QueryExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.class */
public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor {
    private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class);
    public static ReentrantLock lock = new ReentrantLock();
    private final Map<UUID, String> ryaInstanceById = new HashMap();
    private final Multimap<String, UUID> idByRyaInstance = HashMultimap.create();
    private final Map<UUID, KafkaStreams> byQueryId = new HashMap();
    private final CreateKafkaTopic createKafkaTopic;
    private final KafkaStreamsFactory streamsFactory;

    public LocalQueryExecutor(CreateKafkaTopic createKafkaTopic, KafkaStreamsFactory kafkaStreamsFactory) {
        this.createKafkaTopic = (CreateKafkaTopic) Objects.requireNonNull(createKafkaTopic);
        this.streamsFactory = (KafkaStreamsFactory) Objects.requireNonNull(kafkaStreamsFactory);
    }

    protected void startUp() throws Exception {
        log.info("Local Query Executor starting up.");
    }

    protected void shutDown() throws Exception {
        log.info("Local Query Executor shutting down. Stopping all jobs...");
        Iterator<KafkaStreams> it = this.byQueryId.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        log.info("Local Query Executor shut down.");
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.rya.streams.querymanager.QueryExecutor
    public void startQuery(String str, StreamsQuery streamsQuery) throws QueryExecutor.QueryExecutorException {
        Objects.requireNonNull(str);
        Objects.requireNonNull(streamsQuery);
        Preconditions.checkState(state() == Service.State.RUNNING, "The service must be RUNNING to execute this method.");
        lock.lock();
        try {
            try {
                this.createKafkaTopic.createTopics(Sets.newHashSet(new String[]{KafkaTopics.statementsTopic(str), KafkaTopics.queryResultsTopic(str, streamsQuery.getQueryId())}), 1, 1, Optional.empty());
                KafkaStreams make = this.streamsFactory.make(str, streamsQuery);
                make.start();
                this.ryaInstanceById.put(streamsQuery.getQueryId(), str);
                this.idByRyaInstance.put(str, streamsQuery.getQueryId());
                this.byQueryId.put(streamsQuery.getQueryId(), make);
                lock.unlock();
            } catch (KafkaStreamsFactory.KafkaStreamsFactoryException e) {
                throw new QueryExecutor.QueryExecutorException("Could not start query " + streamsQuery.getQueryId(), e);
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.querymanager.QueryExecutor
    public void stopQuery(UUID uuid) throws QueryExecutor.QueryExecutorException {
        Objects.requireNonNull(uuid);
        Preconditions.checkState(state() == Service.State.RUNNING, "The service must be RUNNING to execute this method.");
        lock.lock();
        try {
            if (this.byQueryId.containsKey(uuid)) {
                this.byQueryId.get(uuid).close();
                this.idByRyaInstance.remove(this.ryaInstanceById.remove(uuid), uuid);
                this.byQueryId.remove(uuid);
            }
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.querymanager.QueryExecutor
    public void stopAll(String str) throws QueryExecutor.QueryExecutorException {
        Objects.requireNonNull(str);
        Preconditions.checkState(state() == Service.State.RUNNING, "The service must be RUNNING to execute this method.");
        lock.lock();
        try {
            if (this.idByRyaInstance.containsKey(str)) {
                Iterator it = new HashSet(this.idByRyaInstance.get(str)).iterator();
                while (it.hasNext()) {
                    stopQuery((UUID) it.next());
                }
            }
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.querymanager.QueryExecutor
    public Set<UUID> getRunningQueryIds() throws QueryExecutor.QueryExecutorException {
        lock.lock();
        Preconditions.checkState(state() == Service.State.RUNNING, "The service must be RUNNING to execute this method.");
        try {
            HashSet hashSet = new HashSet(this.byQueryId.keySet());
            lock.unlock();
            return hashSet;
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }
}
