package org.apache.rya.streams.kafka.interactor;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.class */
public class KafkaGetQueryResultStream<T> implements GetQueryResultStream<T> {
    private final String bootstrapServers;
    private final Class<? extends Deserializer<T>> deserializerClass;

    public KafkaGetQueryResultStream(String str, String str2, Class<? extends Deserializer<T>> cls) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        this.bootstrapServers = str + ":" + str2;
        this.deserializerClass = (Class) Objects.requireNonNull(cls);
    }

    public QueryResultStream<T> fromStart(String str, UUID uuid) throws RyaStreamsException {
        Objects.requireNonNull(uuid);
        return makeStream(str, uuid, "earliest");
    }

    public QueryResultStream<T> fromNow(String str, UUID uuid) throws RyaStreamsException {
        Objects.requireNonNull(uuid);
        return makeStream(str, uuid, "latest");
    }

    private QueryResultStream<T> makeStream(String str, UUID uuid, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", this.deserializerClass);
        properties.put("group.id", UUID.randomUUID().toString());
        properties.put("client.id", "Query-Result-Stream-" + uuid);
        properties.put("auto.offset.reset", str2);
        properties.put("enable.auto.commit", false);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(KafkaTopics.queryResultsTopic(str, uuid)));
        return new KafkaQueryResultStream(uuid, kafkaConsumer);
    }
}
