package org.apache.rya.periodic.notification.processor;

import com.google.common.base.Preconditions;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.apache.rya.periodic.notification.api.NotificationProcessor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.openrdf.query.BindingSet;

/* loaded from: input_file:org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.class */
public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable {
    private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class);
    private PeriodicQueryResultStorage periodicStorage;
    private BlockingQueue<TimestampedNotification> notifications;
    private BlockingQueue<NodeBin> bins;
    private BlockingQueue<BindingSetRecord> bindingSets;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private int threadNumber;

    /* loaded from: input_file:org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor$Builder.class */
    public static class Builder {
        private PeriodicQueryResultStorage periodicStorage;
        private BlockingQueue<TimestampedNotification> notifications;
        private BlockingQueue<NodeBin> bins;
        private BlockingQueue<BindingSetRecord> bindingSets;
        private int threadNumber;

        public Builder setNotifications(BlockingQueue<TimestampedNotification> blockingQueue) {
            this.notifications = blockingQueue;
            return this;
        }

        public Builder setBins(BlockingQueue<NodeBin> blockingQueue) {
            this.bins = blockingQueue;
            return this;
        }

        public Builder setBindingSets(BlockingQueue<BindingSetRecord> blockingQueue) {
            this.bindingSets = blockingQueue;
            return this;
        }

        public Builder setThreadNumber(int i) {
            this.threadNumber = i;
            return this;
        }

        public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicQueryResultStorage) {
            this.periodicStorage = periodicQueryResultStorage;
            return this;
        }

        public TimestampedNotificationProcessor build() {
            return new TimestampedNotificationProcessor(this.periodicStorage, this.notifications, this.bins, this.bindingSets, this.threadNumber);
        }
    }

    public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicQueryResultStorage, BlockingQueue<TimestampedNotification> blockingQueue, BlockingQueue<NodeBin> blockingQueue2, BlockingQueue<BindingSetRecord> blockingQueue3, int i) {
        this.notifications = (BlockingQueue) Preconditions.checkNotNull(blockingQueue);
        this.bins = (BlockingQueue) Preconditions.checkNotNull(blockingQueue2);
        this.bindingSets = (BlockingQueue) Preconditions.checkNotNull(blockingQueue3);
        this.periodicStorage = periodicQueryResultStorage;
        this.threadNumber = i;
    }

    @Override // org.apache.rya.periodic.notification.api.NotificationProcessor
    public void processNotification(TimestampedNotification timestampedNotification) {
        String id = timestampedNotification.getId();
        long binFromTimestamp = getBinFromTimestamp(timestampedNotification.getTimestamp().getTime(), timestampedNotification.getPeriod());
        NodeBin nodeBin = new NodeBin(id, binFromTimestamp);
        try {
            PrecomputedJoinStorage.CloseableIterator<BindingSet> listResults = this.periodicStorage.listResults(id, Optional.of(Long.valueOf(binFromTimestamp)));
            Throwable th = null;
            while (listResults.hasNext()) {
                try {
                    try {
                        this.bindingSets.add(new BindingSetRecord(listResults.next(), id));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            this.bins.add(nodeBin);
            if (listResults != null) {
                if (0 != 0) {
                    try {
                        listResults.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    listResults.close();
                }
            }
        } catch (Exception e) {
            log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + binFromTimestamp + " for query: " + id);
        }
    }

    private long getBinFromTimestamp(long j, long j2) {
        Preconditions.checkArgument(j2 > 0);
        return (j / j2) * j2;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.closed.get()) {
            try {
                processNotification(this.notifications.take());
            } catch (Exception e) {
                log.trace("Thread_" + this.threadNumber + " is unable to process next notification.");
                throw new RuntimeException(e);
            }
        }
    }

    public void shutdown() {
        this.closed.set(true);
    }

    public static Builder builder() {
        return new Builder();
    }
}
