package org.apache.rya.streams.kafka;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.streams.api.RyaStreamsClient;
import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.class */
public final class KafkaRyaStreamsClientFactory {
    private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);

    public static RyaStreamsClient make(String str, String str2, int i) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        final InMemoryQueryRepository inMemoryQueryRepository = new InMemoryQueryRepository(new KafkaQueryChangeLog(makeProducer(str2, i, StringSerializer.class, QueryChangeSerializer.class), fromStartConsumer(str2, i, StringDeserializer.class, QueryChangeDeserializer.class), KafkaTopics.queryChangeLogTopic(str)));
        return new RyaStreamsClient(new DefaultAddQuery(inMemoryQueryRepository), new DefaultGetQuery(inMemoryQueryRepository), new DefaultDeleteQuery(inMemoryQueryRepository), new KafkaGetQueryResultStream(str2, "" + i, VisibilityStatementDeserializer.class), new KafkaGetQueryResultStream(str2, "" + i, VisibilityBindingSetDeserializer.class), new DefaultListQueries(inMemoryQueryRepository), new DefaultStartQuery(inMemoryQueryRepository), new DefaultStopQuery(inMemoryQueryRepository)) { // from class: org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory.1
            public void close() {
                try {
                    inMemoryQueryRepository.close();
                } catch (Exception e) {
                    KafkaRyaStreamsClientFactory.log.warn("Couldn't close a QueryRepository.", e);
                }
            }
        };
    }

    private static <K, V> Producer<K, V> makeProducer(String str, int i, Class<? extends Serializer<K>> cls, Class<? extends Serializer<V>> cls2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(cls);
        Objects.requireNonNull(cls2);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str + ":" + i);
        properties.setProperty("key.serializer", cls.getName());
        properties.setProperty("value.serializer", cls2.getName());
        return new KafkaProducer(properties);
    }

    private static <K, V> Consumer<K, V> fromStartConsumer(String str, int i, Class<? extends Deserializer<K>> cls, Class<? extends Deserializer<V>> cls2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(cls);
        Objects.requireNonNull(cls2);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str + ":" + i);
        properties.setProperty("group.id", UUID.randomUUID().toString());
        properties.setProperty("client.id", UUID.randomUUID().toString());
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", cls.getName());
        properties.setProperty("value.deserializer", cls2.getName());
        return new KafkaConsumer(properties);
    }
}
