package org.apache.rya.indexing.mongodb;

import com.google.common.base.Preconditions;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.QueryBuilder;
import info.aduna.iteration.CloseableIteration;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.StatementConstraints;
import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.MongoSecondaryIndex;
import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
import org.apache.rya.mongodb.batch.collection.DbCollectionType;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.query.QueryEvaluationException;

/* loaded from: input_file:org/apache/rya/indexing/mongodb/AbstractMongoIndexer.class */
public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrategy> implements MongoSecondaryIndex {
    private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class);
    private boolean isInit = false;
    private boolean flushEachUpdate = true;
    protected StatefulMongoDBRdfConfiguration conf;
    protected MongoDBRyaDAO dao;
    protected MongoClient mongoClient;
    protected String dbName;
    protected DB db;
    protected DBCollection collection;
    protected Set<URI> predicates;
    protected T storageStrategy;
    private MongoDbBatchWriter<DBObject> mongoDbBatchWriter;

    /* JADX INFO: Access modifiers changed from: protected */
    public void initCore() {
        this.dbName = this.conf.getMongoDBName();
        this.mongoClient = this.conf.getMongoClient();
        this.db = this.mongoClient.getDB(this.dbName);
        this.collection = this.db.getCollection(this.conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
        this.flushEachUpdate = this.conf.flushEachUpdate();
        this.mongoDbBatchWriter = new MongoDbBatchWriter<>(new DbCollectionType(this.collection), MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(this.conf));
        try {
            this.mongoDbBatchWriter.start();
        } catch (MongoDbBatchWriterException e) {
            LOG.error("Error start MongoDB batch writer", e);
        }
    }

    @Override // org.apache.hadoop.conf.Configurable
    public void setConf(Configuration configuration) {
        Preconditions.checkState(configuration instanceof StatefulMongoDBRdfConfiguration, "The provided Configuration must be a StatefulMongoDBRdfConfiguration, but it was " + configuration.getClass().getName());
        this.conf = (StatefulMongoDBRdfConfiguration) configuration;
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        flush();
        try {
            this.mongoDbBatchWriter.shutdown();
        } catch (MongoDbBatchWriterException e) {
            throw new IOException("Error shutting down MongoDB batch writer", e);
        }
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer, java.io.Flushable
    public void flush() throws IOException {
        try {
            this.mongoDbBatchWriter.flush();
        } catch (MongoDbBatchWriterException e) {
            throw new IOException("Error flushing batch writer", e);
        }
    }

    @Override // org.apache.hadoop.conf.Configurable
    public Configuration getConf() {
        return this.conf;
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public String getTableName() {
        return this.dbName;
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public Set<URI> getIndexablePredicates() {
        return this.predicates;
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public void deleteStatement(RyaStatement ryaStatement) throws IOException {
        this.collection.remove(this.storageStrategy.getQuery(ryaStatement));
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public void storeStatements(Collection<RyaStatement> collection) throws IOException {
        Iterator<RyaStatement> it = collection.iterator();
        while (it.hasNext()) {
            storeStatement(it.next(), false);
        }
        if (this.flushEachUpdate) {
            flush();
        }
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public void storeStatement(RyaStatement ryaStatement) throws IOException {
        storeStatement(ryaStatement, this.flushEachUpdate);
    }

    private void storeStatement(RyaStatement ryaStatement, boolean z) throws IOException {
        try {
            this.mongoDbBatchWriter.addObjectToQueue(prepareStatementForStorage(ryaStatement));
            if (z) {
                flush();
            }
        } catch (MongoDbBatchWriterException e) {
            throw new IOException("Error storing statement", e);
        }
    }

    private DBObject prepareStatementForStorage(RyaStatement ryaStatement) {
        try {
            Statement convertStatement = RyaToRdfConversions.convertStatement(ryaStatement);
            if ((this.predicates.isEmpty() || this.predicates.contains(convertStatement.getPredicate())) && (convertStatement.getObject() instanceof Literal)) {
                return this.storageStrategy.serialize(ryaStatement);
            }
            return null;
        } catch (IllegalArgumentException e) {
            LOG.error("Unable to parse the statement: " + ryaStatement.toString(), e);
            return null;
        }
    }

    @Override // org.apache.rya.api.persist.index.RyaSecondaryIndexer
    public void dropGraph(RyaURI... ryaURIArr) {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CloseableIteration<Statement, QueryEvaluationException> withConstraints(StatementConstraints statementConstraints, DBObject dBObject) {
        return closableIterationFromCursor(QueryBuilder.start().and(dBObject).and(this.storageStrategy.getQuery(statementConstraints)).get());
    }

    private CloseableIteration<Statement, QueryEvaluationException> closableIterationFromCursor(DBObject dBObject) {
        final DBCursor find = this.collection.find(dBObject);
        return new CloseableIteration<Statement, QueryEvaluationException>() { // from class: org.apache.rya.indexing.mongodb.AbstractMongoIndexer.1
            @Override // info.aduna.iteration.Iteration
            public boolean hasNext() {
                return find.hasNext();
            }

            @Override // info.aduna.iteration.Iteration
            public Statement next() throws QueryEvaluationException {
                return RyaToRdfConversions.convertStatement(AbstractMongoIndexer.this.storageStrategy.deserializeDBObject(find.next()));
            }

            @Override // info.aduna.iteration.Iteration
            public void remove() {
                throw new UnsupportedOperationException("Remove not implemented");
            }

            @Override // info.aduna.iteration.CloseableIteration
            public void close() throws QueryEvaluationException {
                find.close();
            }
        };
    }

    public abstract String getCollectionName();
}
