package org.apache.rya.streams.kafka;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.rya.api.function.projection.RandomUUIDFactory;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
import org.apache.rya.streams.kafka.topology.TopologyFactory;
import org.eclipse.rdf4j.query.MalformedQueryException;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.class */
public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory {
    private final TopologyBuilderFactory topologyFactory = new TopologyFactory();
    private final String bootstrapServersConfig;

    public SingleThreadKafkaStreamsFactory(String str) {
        this.bootstrapServersConfig = (String) Objects.requireNonNull(str);
    }

    @Override // org.apache.rya.streams.kafka.KafkaStreamsFactory
    public KafkaStreams make(String str, StreamsQuery streamsQuery) throws KafkaStreamsFactory.KafkaStreamsFactoryException {
        Objects.requireNonNull(str);
        Objects.requireNonNull(streamsQuery);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServersConfig);
        properties.put("application.id", "RyaStreams-Query-" + streamsQuery.getQueryId());
        properties.put("auto.offset.reset", "earliest");
        try {
            return new KafkaStreams(this.topologyFactory.build(streamsQuery.getSparql(), KafkaTopics.statementsTopic(str), KafkaTopics.queryResultsTopic(str, streamsQuery.getQueryId()), new RandomUUIDFactory()), new StreamsConfig(properties));
        } catch (MalformedQueryException | TopologyBuilderFactory.TopologyBuilderException e) {
            throw new KafkaStreamsFactory.KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + streamsQuery.getQueryId(), e);
        }
    }
}
