package org.apache.rya.indexing.pcj.fluo.app.observers;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.api.observer.Observer;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;

/* loaded from: input_file:org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.class */
public class QueryResultObserver extends AbstractObserver {
    private static final Logger log = Logger.getLogger(QueryResultObserver.class);
    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
    private static final ImmutableSet<IncrementalResultExporterFactory> factories = ImmutableSet.builder().add(new RyaBindingSetExporterFactory()).add(new KafkaBindingSetExporterFactory()).add(new KafkaRyaSubGraphExporterFactory()).add(new RyaSubGraphExporterFactory()).add(new PeriodicBindingSetExporterFactory()).build();
    private ExporterManager exporterManager;

    public Observer.ObservedColumn getObservedColumn() {
        return new Observer.ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, Observer.NotificationType.STRONG);
    }

    public void init(Observer.Context context) {
        ExporterManager.Builder builder = ExporterManager.builder();
        UnmodifiableIterator it = factories.iterator();
        while (it.hasNext()) {
            IncrementalResultExporterFactory incrementalResultExporterFactory = (IncrementalResultExporterFactory) it.next();
            try {
                log.debug("QueryResultObserver.init(): for each exportersBuilder=" + incrementalResultExporterFactory);
                Optional<IncrementalResultExporter> build = incrementalResultExporterFactory.build(context);
                if (build.isPresent()) {
                    builder.addIncrementalResultExporter((IncrementalResultExporter) build.get());
                }
            } catch (IncrementalResultExporterFactory.IncrementalExporterFactoryException e) {
                log.error("Could not initialize a result exporter.", e);
            }
        }
        this.exporterManager = builder.build();
    }

    public void process(TransactionBase transactionBase, Bytes bytes, Column column) throws Exception {
        String str = bytes.toString().split(IncrementalUpdateConstants.NODEID_BS_DELIM)[0];
        QueryMetadata readQueryMetadata = dao.readQueryMetadata(transactionBase, str);
        this.exporterManager.export(readQueryMetadata.getQueryType(), readQueryMetadata.getExportStrategies(), str, transactionBase.get(bytes, column));
    }

    public void close() {
        try {
            this.exporterManager.close();
        } catch (Exception e) {
            log.warn("Encountered problems closing the ExporterManager.");
        }
    }
}
