package org.apache.rya.indexing.pcj.fluo.app.observers;

import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.api.observer.Observer;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.shaded.com.google.common.base.Optional;
import org.apache.rya.shaded.com.google.common.collect.ImmutableSet;
import org.apache.rya.shaded.com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.class */
public class QueryResultObserver extends AbstractObserver {
    private final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
    private ExporterManager exporterManager;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) QueryResultObserver.class);
    private static final ImmutableSet<IncrementalResultExporterFactory> FACTORIES = ImmutableSet.builder().add((ImmutableSet.Builder) new RyaBindingSetExporterFactory()).add((ImmutableSet.Builder) new KafkaBindingSetExporterFactory()).add((ImmutableSet.Builder) new KafkaRyaSubGraphExporterFactory()).add((ImmutableSet.Builder) new RyaSubGraphExporterFactory()).add((ImmutableSet.Builder) new PeriodicBindingSetExporterFactory()).build();

    @Override // org.apache.fluo.api.observer.Observer
    public Observer.ObservedColumn getObservedColumn() {
        return new Observer.ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, Observer.NotificationType.STRONG);
    }

    @Override // org.apache.fluo.api.observer.AbstractObserver, org.apache.fluo.api.observer.Observer
    public void init(Observer.Context context) {
        ExporterManager.Builder builder = ExporterManager.builder();
        UnmodifiableIterator<IncrementalResultExporterFactory> it = FACTORIES.iterator();
        while (it.hasNext()) {
            IncrementalResultExporterFactory next = it.next();
            try {
                log.debug("Attempting to build exporter from factory: {}", next);
                Optional<IncrementalResultExporter> build = next.build(context);
                if (build.isPresent()) {
                    log.info("Adding exporter: {}", build.get());
                    builder.addIncrementalResultExporter(build.get());
                }
            } catch (IncrementalResultExporterFactory.IncrementalExporterFactoryException e) {
                log.error("Could not initialize a result exporter.", (Throwable) e);
            }
        }
        this.exporterManager = builder.build();
    }

    @Override // org.apache.fluo.api.observer.Observer
    public void process(TransactionBase transactionBase, Bytes bytes, Column column) throws Exception {
        String nodeId = BindingSetRow.makeFromShardedRow(Bytes.of(IncrementalUpdateConstants.QUERY_PREFIX), bytes).getNodeId();
        QueryMetadata readQueryMetadata = this.queryDao.readQueryMetadata(transactionBase, nodeId);
        this.exporterManager.export(readQueryMetadata.getQueryType(), readQueryMetadata.getExportStrategies(), nodeId, transactionBase.get(bytes, column));
    }

    @Override // org.apache.fluo.api.observer.AbstractObserver, org.apache.fluo.api.observer.Observer
    public void close() {
        try {
            this.exporterManager.close();
        } catch (Exception e) {
            log.warn("Encountered problems closing the ExporterManager.");
        }
    }
}
