package org.apache.rya.forwardchain.strategy;

import com.google.common.base.Preconditions;
import com.mongodb.client.MongoCollection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.StatementMetadata;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.query.RyaQuery;
import org.apache.rya.api.persist.query.RyaQueryEngine;
import org.apache.rya.forwardchain.ForwardChainException;
import org.apache.rya.forwardchain.rule.AbstractConstructRule;
import org.apache.rya.forwardchain.rule.Rule;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
import org.apache.rya.mongodb.aggregation.AggregationPipelineQueryNode;
import org.apache.rya.mongodb.aggregation.SparqlToPipelineTransformVisitor;
import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
import org.apache.rya.mongodb.batch.collection.MongoCollectionType;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.apache.rya.sail.config.RyaSailFactory;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.rdf4j.query.algebra.QueryRoot;
import org.eclipse.rdf4j.query.algebra.TupleExpr;

/* loaded from: input_file:org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.class */
public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy {
    private static final Logger logger = Logger.getLogger(MongoPipelineStrategy.class);
    private static final int PIPELINE_BATCH_SIZE = 1000;
    private final SparqlToPipelineTransformVisitor pipelineVisitor;
    private final MongoCollection<Document> baseCollection;
    private final MongoDbBatchWriter<Document> batchWriter;
    private final MongoDBRyaDAO dao;
    private final AbstractRuleExecutionStrategy backup;
    private final RyaQueryEngine<StatefulMongoDBRdfConfiguration> engine;
    private final SimpleMongoDBStorageStrategy storageStrategy = new SimpleMongoDBStorageStrategy();
    private final ConcurrentHashMap<Rule, Long> executionTimes = new ConcurrentHashMap<>();
    private boolean usedBackup = false;

    public MongoPipelineStrategy(MongoDBRdfConfiguration mongoDBRdfConfiguration) throws ForwardChainException {
        StatefulMongoDBRdfConfiguration conf;
        Preconditions.checkNotNull(mongoDBRdfConfiguration);
        String mongoDBName = mongoDBRdfConfiguration.getMongoDBName();
        String triplesCollectionName = mongoDBRdfConfiguration.getTriplesCollectionName();
        mongoDBRdfConfiguration.setFlush(false);
        try {
            if (mongoDBRdfConfiguration instanceof StatefulMongoDBRdfConfiguration) {
                conf = (StatefulMongoDBRdfConfiguration) mongoDBRdfConfiguration;
                this.dao = new MongoDBRyaDAO();
                this.dao.setConf(conf);
                this.dao.init();
            } else {
                this.dao = RyaSailFactory.getMongoDAO(mongoDBRdfConfiguration);
                conf = this.dao.getConf();
            }
            this.baseCollection = conf.getMongoClient().getDatabase(mongoDBName).getCollection(triplesCollectionName);
            this.pipelineVisitor = new SparqlToPipelineTransformVisitor(this.baseCollection);
            this.engine = this.dao.getQueryEngine2();
            this.backup = new SailExecutionStrategy(conf);
            this.batchWriter = new MongoDbBatchWriter<>(new MongoCollectionType(this.baseCollection), MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf));
            try {
                this.batchWriter.start();
            } catch (MongoDbBatchWriterException e) {
                throw new ForwardChainException("Error starting MongoDB batch writer", e);
            }
        } catch (RyaDAOException e2) {
            throw new ForwardChainException("Can't connect to Rya.", e2);
        }
    }

    @Override // org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy
    public long executeConstructRule(AbstractConstructRule abstractConstructRule, StatementMetadata statementMetadata) throws ForwardChainException {
        Preconditions.checkNotNull(abstractConstructRule);
        logger.info("Applying inference rule " + abstractConstructRule + "...");
        long currentTimeMillis = System.currentTimeMillis();
        List<Bson> list = null;
        try {
            list = toPipeline(abstractConstructRule, this.usedBackup ? 0 : this.requiredLevel, currentTimeMillis);
        } catch (ForwardChainException e) {
            logger.error(e);
        }
        if (list != null) {
            Iterator<Bson> it = list.iterator();
            while (it.hasNext()) {
                logger.debug("\t" + it.next().toString());
            }
            LongAdder longAdder = new LongAdder();
            this.baseCollection.aggregate(list).allowDiskUse(true).batchSize2(1000).forEach(document -> {
                RyaStatement deserializeDocument = this.storageStrategy.deserializeDocument(document);
                if (statementExists(deserializeDocument)) {
                    return;
                }
                longAdder.increment();
                document.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, statementMetadata.toString());
                try {
                    this.batchWriter.addObjectToQueue(document);
                } catch (MongoDbBatchWriterException e2) {
                    logger.error("Couldn't insert " + deserializeDocument, e2);
                }
            });
            try {
                this.batchWriter.flush();
                logger.info("Added " + longAdder + " new statements.");
                this.executionTimes.compute(abstractConstructRule, (rule, l) -> {
                    return (l == null || l.longValue() <= currentTimeMillis) ? Long.valueOf(currentTimeMillis) : l;
                });
                return longAdder.longValue();
            } catch (MongoDbBatchWriterException e2) {
                throw new ForwardChainException("Error writing to Mongo", e2);
            }
        }
        if (this.backup == null) {
            logger.error("Couldn't convert " + abstractConstructRule + " to pipeline:");
            for (String str : abstractConstructRule.getQuery().toString().split("\n")) {
                logger.error("\t" + str);
            }
            throw new UnsupportedOperationException("Couldn't convert query to pipeline.");
        }
        logger.debug("Couldn't convert " + abstractConstructRule + " to pipeline:");
        for (String str2 : abstractConstructRule.getQuery().toString().split("\n")) {
            logger.debug("\t" + str2);
        }
        logger.debug("Using fallback strategy.");
        this.usedBackup = true;
        return this.backup.executeConstructRule(abstractConstructRule, statementMetadata);
    }

    private boolean statementExists(RyaStatement ryaStatement) {
        try {
            return this.engine.query(new RyaQuery(ryaStatement)).iterator().hasNext();
        } catch (RyaDAOException e) {
            logger.error("Error querying for " + ryaStatement, e);
            return false;
        }
    }

    @Override // org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy
    public void shutDown() throws ForwardChainException {
        this.backup.shutDown();
        try {
            this.batchWriter.shutdown();
        } catch (MongoDbBatchWriterException e) {
            throw new ForwardChainException("Error shutting down batch writer", e);
        }
    }

    private List<Bson> toPipeline(AbstractConstructRule abstractConstructRule, int i, long j) throws ForwardChainException {
        TupleExpr tupleExpr = abstractConstructRule.getQuery().getTupleExpr();
        if (!(tupleExpr instanceof QueryRoot)) {
            tupleExpr = new QueryRoot(tupleExpr);
        }
        try {
            tupleExpr.visit(this.pipelineVisitor);
            if (!(tupleExpr instanceof QueryRoot)) {
                return null;
            }
            QueryRoot queryRoot = (QueryRoot) tupleExpr;
            if (!(queryRoot.getArg() instanceof AggregationPipelineQueryNode)) {
                return null;
            }
            AggregationPipelineQueryNode aggregationPipelineQueryNode = (AggregationPipelineQueryNode) queryRoot.getArg();
            aggregationPipelineQueryNode.distinct();
            aggregationPipelineQueryNode.requireSourceDerivationDepth(i);
            long longValue = this.executionTimes.getOrDefault(abstractConstructRule, 0L).longValue();
            if (longValue > 0) {
                aggregationPipelineQueryNode.requireSourceTimestamp(longValue);
            }
            return aggregationPipelineQueryNode.getTriplePipeline(j, false);
        } catch (Exception e) {
            throw new ForwardChainException("Error converting construct rule to an aggregation pipeline", e);
        }
    }
}
