package org.apache.rya.streams.kafka.queries;

import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import info.aduna.iteration.CloseableIteration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.rya.streams.api.queries.ChangeLogEntry;
import org.apache.rya.streams.api.queries.QueryChange;
import org.apache.rya.streams.api.queries.QueryChangeLog;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.class */
public class KafkaQueryChangeLog implements QueryChangeLog {
    private final Producer<?, QueryChange> producer;
    private final Consumer<?, QueryChange> consumer;
    private final String topic;

    /* loaded from: input_file:org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog$QueryChangeLogEntryIter.class */
    private class QueryChangeLogEntryIter implements CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLog.QueryChangeLogException> {
        private final Consumer<?, QueryChange> consumer;
        private Iterator<ChangeLogEntry<QueryChange>> iterCache;

        public QueryChangeLogEntryIter(Consumer<?, QueryChange> consumer) {
            this.consumer = (Consumer) Objects.requireNonNull(consumer);
        }

        public boolean hasNext() throws QueryChangeLog.QueryChangeLogException {
            maybePopulateCache();
            return this.iterCache.hasNext();
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public ChangeLogEntry<QueryChange> m16next() throws QueryChangeLog.QueryChangeLogException {
            maybePopulateCache();
            if (this.iterCache.hasNext()) {
                return this.iterCache.next();
            }
            throw new QueryChangeLog.QueryChangeLogException("There are no changes in the change log.");
        }

        public void remove() throws QueryChangeLog.QueryChangeLogException {
        }

        public void close() throws QueryChangeLog.QueryChangeLogException {
            this.consumer.unsubscribe();
        }

        private void maybePopulateCache() {
            if (this.iterCache == null || !this.iterCache.hasNext()) {
                ConsumerRecords poll = this.consumer.poll(3000L);
                ArrayList arrayList = new ArrayList();
                poll.forEach(consumerRecord -> {
                    arrayList.add(new ChangeLogEntry(consumerRecord.offset(), consumerRecord.value()));
                });
                this.iterCache = arrayList.iterator();
            }
        }
    }

    public KafkaQueryChangeLog(Producer<?, QueryChange> producer, Consumer<?, QueryChange> consumer, String str) {
        this.producer = (Producer) Objects.requireNonNull(producer);
        this.consumer = (Consumer) Objects.requireNonNull(consumer);
        this.topic = (String) Objects.requireNonNull(str);
    }

    public void write(QueryChange queryChange) throws QueryChangeLog.QueryChangeLogException {
        Objects.requireNonNull(queryChange);
        Future send = this.producer.send(new ProducerRecord(this.topic, queryChange));
        this.producer.flush();
        try {
            send.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new QueryChangeLog.QueryChangeLogException("Could not record a new query change to the Kafka Query Change Log.", e);
        }
    }

    public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLog.QueryChangeLogException> readFromStart() throws QueryChangeLog.QueryChangeLogException {
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        this.consumer.assign(Lists.newArrayList(new TopicPartition[]{topicPartition}));
        this.consumer.seekToBeginning(Lists.newArrayList(new TopicPartition[]{topicPartition}));
        return new QueryChangeLogEntryIter(this.consumer);
    }

    public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLog.QueryChangeLogException> readFromPosition(long j) throws QueryChangeLog.QueryChangeLogException {
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        this.consumer.assign(Lists.newArrayList(new TopicPartition[]{topicPartition}));
        this.consumer.seek(topicPartition, j);
        return new QueryChangeLogEntryIter(this.consumer);
    }

    public void close() throws Exception {
        this.producer.close();
        this.consumer.close();
    }
}
