package org.apache.rya.streams.api.queries;

import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.api.queries.QueryRepository;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/api/queries/InMemoryQueryRepository.class */
public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository {
    private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class);
    private final QueryChangeLog changeLog;
    private final AbstractScheduledService.Scheduler scheduler;
    private final ReentrantLock lock = new ReentrantLock(true);
    private Optional<Long> cachePosition = Optional.empty();
    private final Map<UUID, StreamsQuery> queriesCache = new HashMap();
    private final List<QueryChangeLogListener> listeners = new ArrayList();

    public InMemoryQueryRepository(QueryChangeLog queryChangeLog, AbstractScheduledService.Scheduler scheduler) {
        this.changeLog = (QueryChangeLog) Objects.requireNonNull(queryChangeLog);
        this.scheduler = (AbstractScheduledService.Scheduler) Objects.requireNonNull(scheduler);
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public StreamsQuery add(String str, boolean z, boolean z2) throws QueryRepository.QueryRepositoryException, IllegalStateException {
        Objects.requireNonNull(str);
        this.lock.lock();
        try {
            try {
                checkState();
                UUID randomUUID = UUID.randomUUID();
                this.changeLog.write(QueryChange.create(randomUUID, str, z, z2));
                updateCache();
                StreamsQuery streamsQuery = this.queriesCache.get(randomUUID);
                this.lock.unlock();
                return streamsQuery;
            } catch (QueryChangeLog.QueryChangeLogException e) {
                throw new QueryRepository.QueryRepositoryException("Could not create a Rya Streams query for the SPARQL string: " + str, e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public Optional<StreamsQuery> get(UUID uuid) throws QueryRepository.QueryRepositoryException, IllegalStateException {
        Objects.requireNonNull(uuid);
        this.lock.lock();
        try {
            checkState();
            updateCache();
            return Optional.ofNullable(this.queriesCache.get(uuid));
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public void updateIsActive(UUID uuid, boolean z) throws QueryRepository.QueryRepositoryException, IllegalStateException {
        Objects.requireNonNull(uuid);
        this.lock.lock();
        try {
            try {
                checkState();
                updateCache();
                if (!this.queriesCache.containsKey(uuid)) {
                    throw new QueryRepository.QueryRepositoryException("No query exists for ID " + uuid + ".");
                }
                this.changeLog.write(QueryChange.update(uuid, z));
                this.lock.unlock();
            } catch (QueryChangeLog.QueryChangeLogException e) {
                throw new QueryRepository.QueryRepositoryException("Could not update the Rya Streams query for with ID: " + uuid, e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public void delete(UUID uuid) throws QueryRepository.QueryRepositoryException, IllegalStateException {
        Objects.requireNonNull(uuid);
        this.lock.lock();
        try {
            try {
                checkState();
                this.changeLog.write(QueryChange.delete(uuid));
                this.lock.unlock();
            } catch (QueryChangeLog.QueryChangeLogException e) {
                throw new QueryRepository.QueryRepositoryException("Could not delete a Rya Streams query for the Query ID: " + uuid, e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public Set<StreamsQuery> list() throws QueryRepository.QueryRepositoryException, IllegalStateException {
        this.lock.lock();
        try {
            checkState();
            updateCache();
            return (Set) this.queriesCache.values().stream().collect(Collectors.toSet());
        } finally {
            this.lock.unlock();
        }
    }

    protected void shutDown() throws Exception {
        this.lock.lock();
        try {
            this.changeLog.close();
        } finally {
            this.lock.unlock();
        }
    }

    private void updateCache() {
        log.trace("updateCache() - Enter");
        CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLog.QueryChangeLogException> closeableIteration = null;
        try {
            try {
                log.debug("Starting cache position:" + this.cachePosition);
                closeableIteration = this.cachePosition.isPresent() ? this.changeLog.readFromPosition(this.cachePosition.get().longValue() + 1) : this.changeLog.readFromStart();
                while (closeableIteration.hasNext()) {
                    ChangeLogEntry changeLogEntry = (ChangeLogEntry) closeableIteration.next();
                    QueryChange queryChange = (QueryChange) changeLogEntry.getEntry();
                    UUID queryId = queryChange.getQueryId();
                    log.debug("Updating the cache to reflect:\n" + queryChange);
                    switch (queryChange.getChangeType()) {
                        case CREATE:
                            this.queriesCache.put(queryId, new StreamsQuery(queryId, (String) queryChange.getSparql().get(), ((Boolean) queryChange.getIsActive().get()).booleanValue(), ((Boolean) queryChange.getIsInsert().get()).booleanValue()));
                            break;
                        case UPDATE:
                            if (!this.queriesCache.containsKey(queryId)) {
                                break;
                            } else {
                                StreamsQuery streamsQuery = this.queriesCache.get(queryId);
                                this.queriesCache.put(queryId, new StreamsQuery(streamsQuery.getQueryId(), streamsQuery.getSparql(), ((Boolean) queryChange.getIsActive().get()).booleanValue(), streamsQuery.isInsert()));
                                break;
                            }
                        case DELETE:
                            this.queriesCache.remove(queryId);
                            break;
                    }
                    log.debug("Notifying listeners with the updated state.");
                    Optional ofNullable = Optional.ofNullable(this.queriesCache.get(queryId));
                    this.listeners.forEach(queryChangeLogListener -> {
                        queryChangeLogListener.notify(changeLogEntry, ofNullable);
                    });
                    this.cachePosition = Optional.of(Long.valueOf(changeLogEntry.getPosition()));
                    log.debug("New cache position: " + this.cachePosition);
                }
            } catch (QueryChangeLog.QueryChangeLogException e) {
                throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e);
            }
        } finally {
            if (closeableIteration != null) {
                try {
                    closeableIteration.close();
                } catch (QueryChangeLog.QueryChangeLogException e2) {
                    log.error("Could not close the " + CloseableIteration.class.getName(), e2);
                    log.trace("updateCache() - Exit");
                }
            }
            log.trace("updateCache() - Exit");
        }
    }

    protected void runOneIteration() throws Exception {
        log.trace("runOneIteration() - Enter");
        this.lock.lock();
        try {
            updateCache();
        } finally {
            this.lock.unlock();
            log.trace("runOneIteration() - Exit");
        }
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return this.scheduler;
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public Set<StreamsQuery> subscribe(QueryChangeLogListener queryChangeLogListener) {
        log.trace("subscribe(listener) - Enter");
        this.lock.lock();
        log.trace("subscribe(listener) - Acquired lock");
        try {
            this.listeners.add(queryChangeLogListener);
            log.trace("subscribe(listener) - Listener Registered");
            Set<StreamsQuery> set = (Set) this.queriesCache.values().stream().collect(Collectors.toSet());
            log.trace("subscribe(listener) - Returning " + set.size() + " existing queries");
            log.trace("subscribe(listener) - Releasing lock");
            this.lock.unlock();
            log.trace("subscribe(listener) - Exit");
            return set;
        } catch (Throwable th) {
            log.trace("subscribe(listener) - Releasing lock");
            this.lock.unlock();
            log.trace("subscribe(listener) - Exit");
            throw th;
        }
    }

    @Override // org.apache.rya.streams.api.queries.QueryRepository
    public void unsubscribe(QueryChangeLogListener queryChangeLogListener) {
        this.lock.lock();
        try {
            this.listeners.remove(queryChangeLogListener);
        } finally {
            this.lock.unlock();
        }
    }

    private void checkState() {
        if (!super.isRunning() && !this.listeners.isEmpty()) {
            throw new IllegalStateException("The Query Repository is subscribed to, but the service has not been started.");
        }
    }
}
