package org.apache.rya.streams.kafka.interactor;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.RunQuery;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/interactor/KafkaRunQuery.class */
public class KafkaRunQuery implements RunQuery {
    private static final Logger log = LoggerFactory.getLogger(KafkaRunQuery.class);
    private final String kafkaHostname;
    private final String kafkaPort;
    private final String statementsTopic;
    private final String resultsTopic;
    private final TopologyBuilderFactory topologyFactory;
    private final QueryRepository queryRepo;

    public KafkaRunQuery(String str, String str2, String str3, String str4, QueryRepository queryRepository, TopologyBuilderFactory topologyBuilderFactory) {
        this.kafkaHostname = (String) Objects.requireNonNull(str);
        this.kafkaPort = (String) Objects.requireNonNull(str2);
        this.statementsTopic = (String) Objects.requireNonNull(str3);
        this.resultsTopic = (String) Objects.requireNonNull(str4);
        this.topologyFactory = (TopologyBuilderFactory) Objects.requireNonNull(topologyBuilderFactory);
        this.queryRepo = (QueryRepository) Objects.requireNonNull(queryRepository);
    }

    public void run(UUID uuid) throws RyaStreamsException {
        Objects.requireNonNull(uuid);
        Optional optional = this.queryRepo.get(uuid);
        if (!optional.isPresent()) {
            throw new RyaStreamsException("Could not run the Query with ID " + uuid + " because no such query is currently registered.");
        }
        String sparql = ((StreamsQuery) optional.get()).getSparql();
        try {
            TopologyBuilder build = this.topologyFactory.build(sparql, this.statementsTopic, this.resultsTopic, new RandomUUIDFactory());
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", this.kafkaHostname + ":" + this.kafkaPort);
            properties.put("application.id", "KafkaRunQuery-" + uuid);
            properties.put("auto.offset.reset", "earliest");
            KafkaStreams kafkaStreams = new KafkaStreams(build, new StreamsConfig(properties));
            kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
                log.error("Unhandled exception while processing the Rya Streams query. Shutting down.", th);
                System.exit(1);
            });
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.rya.streams.kafka.interactor.KafkaRunQuery.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    countDownLatch.countDown();
                }
            });
            kafkaStreams.start();
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                log.warn("Interrupted while waiting for termination. Shutting down.");
            }
            kafkaStreams.close();
        } catch (Exception e2) {
            throw new RyaStreamsException("Could not run the Query with ID " + uuid + " because a processing topolgoy could not be built for the SPARQL " + sparql, e2);
        }
    }
}
