/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.river.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.util.JSON;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.bson.BasicBSONObject;
import org.bson.types.BSONTimestamp;
import org.bson.types.ObjectId;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.mongodb.util.GridFSHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;

public class MongoDBRiver
extends AbstractRiverComponent
implements River {
    public static final String IS_MONGODB_ATTACHMENT = "is_mongodb_attachment";
    public static final String MONGODB_ATTACHMENT = "mongodb_attachment";
    public static final String RIVER_TYPE = "mongodb";
    public static final String ROOT_NAME = "mongodb";
    public static final String DB_FIELD = "db";
    public static final String SERVERS_FIELD = "servers";
    public static final String HOST_FIELD = "host";
    public static final String PORT_FIELD = "port";
    public static final String OPTIONS_FIELD = "options";
    public static final String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference";
    public static final String FILTER_FIELD = "filter";
    public static final String CREDENTIALS_FIELD = "credentials";
    public static final String USER_FIELD = "user";
    public static final String PASSWORD_FIELD = "password";
    public static final String SCRIPT_FIELD = "script";
    public static final String SCRIPT_TYPE_FIELD = "scriptType";
    public static final String COLLECTION_FIELD = "collection";
    public static final String GRIDFS_FIELD = "gridfs";
    public static final String INDEX_OBJECT = "index";
    public static final String NAME_FIELD = "name";
    public static final String TYPE_FIELD = "type";
    public static final String LOCAL_DB_FIELD = "local";
    public static final String ADMIN_DB_FIELD = "admin";
    public static final String DB_LOCAL = "local";
    public static final String DB_ADMIN = "admin";
    public static final String DB_CONFIG = "config";
    public static final String DEFAULT_DB_HOST = "localhost";
    public static final String THROTTLE_SIZE_FIELD = "throttle_size";
    public static final int DEFAULT_DB_PORT = 27017;
    public static final String BULK_SIZE_FIELD = "bulk_size";
    public static final String BULK_TIMEOUT_FIELD = "bulk_timeout";
    public static final String LAST_TIMESTAMP_FIELD = "_last_ts";
    public static final String MONGODB_LOCAL = "local";
    public static final String MONGODB_ADMIN = "admin";
    public static final String MONGODB_ID_FIELD = "_id";
    public static final String OPLOG_COLLECTION = "oplog.rs";
    public static final String OPLOG_NAMESPACE = "ns";
    public static final String OPLOG_OBJECT = "o";
    public static final String OPLOG_UPDATE = "o2";
    public static final String OPLOG_OPERATION = "op";
    public static final String OPLOG_UPDATE_OPERATION = "u";
    public static final String OPLOG_INSERT_OPERATION = "i";
    public static final String OPLOG_DELETE_OPERATION = "d";
    public static final String OPLOG_TIMESTAMP = "ts";
    public static final String GRIDFS_FILES_SUFFIX = ".files";
    public static final String GRIDFS_CHUNKS_SUFFIX = ".chunks";
    protected final Client client;
    protected final String riverIndexName;
    protected final List<ServerAddress> mongoServers = new ArrayList<ServerAddress>();
    protected final String mongoDb;
    protected final String mongoCollection;
    protected final boolean mongoGridFS;
    protected final String mongoFilter;
    protected final String mongoAdminUser;
    protected final String mongoAdminPassword;
    protected final String mongoLocalUser;
    protected final String mongoLocalPassword;
    protected final String mongoOplogNamespace;
    protected final boolean mongoSecondaryReadPreference;
    protected final String indexName;
    protected final String typeName;
    protected final int bulkSize;
    protected final TimeValue bulkTimeout;
    protected final int throttleSize;
    private final ExecutableScript script;
    protected volatile List<Thread> tailerThreads = new ArrayList<Thread>();
    protected volatile Thread indexerThread;
    protected volatile boolean active = true;
    private final BlockingQueue<Map<String, Object>> stream;
    private Mongo mongo;
    private DB adminDb;

    @Inject
    public MongoDBRiver(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, Client client, ScriptService scriptService) {
        super(riverName, settings);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Prefix: [{}] - name: [{}]", new Object[]{this.logger.getPrefix(), this.logger.getName()});
            this.logger.debug("River settings: ", new Object[]{settings.settings()});
        }
        this.riverIndexName = riverIndexName;
        this.client = client;
        if (settings.settings().containsKey("mongodb")) {
            int mongoPort;
            String mongoHost;
            Map mongoSettings = (Map)settings.settings().get("mongodb");
            if (mongoSettings.containsKey(SERVERS_FIELD)) {
                Object mongoServersSettings = mongoSettings.get(SERVERS_FIELD);
                this.logger.info("mongoServersSettings: " + mongoServersSettings, new Object[0]);
                boolean array = XContentMapValues.isArray(mongoServersSettings);
                if (array) {
                    ArrayList feeds = (ArrayList)mongoServersSettings;
                    for (Map feed : feeds) {
                        mongoHost = XContentMapValues.nodeStringValue(feed.get(HOST_FIELD), null);
                        mongoPort = XContentMapValues.nodeIntegerValue(feed.get(PORT_FIELD), (int)0);
                        this.logger.info("Server: " + mongoHost + " - " + mongoPort, new Object[0]);
                        try {
                            this.mongoServers.add(new ServerAddress(mongoHost, mongoPort));
                        }
                        catch (UnknownHostException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } else {
                mongoHost = XContentMapValues.nodeStringValue(mongoSettings.get(HOST_FIELD), (String)DEFAULT_DB_HOST);
                mongoPort = XContentMapValues.nodeIntegerValue(mongoSettings.get(PORT_FIELD), (int)27017);
                try {
                    this.mongoServers.add(new ServerAddress(mongoHost, mongoPort));
                }
                catch (UnknownHostException e) {
                    e.printStackTrace();
                }
            }
            if (mongoSettings.containsKey(OPTIONS_FIELD)) {
                Map mongoOptionsSettings = (Map)mongoSettings.get(OPTIONS_FIELD);
                this.mongoSecondaryReadPreference = XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(SECONDARY_READ_PREFERENCE_FIELD), (boolean)false);
            } else {
                this.mongoSecondaryReadPreference = false;
            }
            if (mongoSettings.containsKey(CREDENTIALS_FIELD)) {
                String mau = "";
                String map = "";
                String mlu = "";
                String mlp = "";
                Object mongoCredentialsSettings = mongoSettings.get(CREDENTIALS_FIELD);
                boolean array = XContentMapValues.isArray(mongoCredentialsSettings);
                if (array) {
                    ArrayList credentials = (ArrayList)mongoCredentialsSettings;
                    for (Map credential : credentials) {
                        String dbCredential = XContentMapValues.nodeStringValue(credential.get(DB_FIELD), null);
                        if ("admin".equals(dbCredential)) {
                            mau = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
                            map = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
                            continue;
                        }
                        if (!"local".equals(dbCredential)) continue;
                        mlu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
                        mlp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
                    }
                }
                this.mongoAdminUser = mau;
                this.mongoAdminPassword = map;
                this.mongoLocalUser = mlu;
                this.mongoLocalPassword = mlp;
            } else {
                this.mongoAdminUser = "";
                this.mongoAdminPassword = "";
                this.mongoLocalUser = "";
                this.mongoLocalPassword = "";
            }
            this.mongoDb = XContentMapValues.nodeStringValue(mongoSettings.get(DB_FIELD), (String)riverName.name());
            this.mongoCollection = XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), (String)riverName.name());
            this.mongoGridFS = XContentMapValues.nodeBooleanValue(mongoSettings.get(GRIDFS_FIELD), (boolean)false);
            this.mongoFilter = mongoSettings.containsKey(FILTER_FIELD) ? XContentMapValues.nodeStringValue(mongoSettings.get(FILTER_FIELD), (String)"") : "";
            if (mongoSettings.containsKey(SCRIPT_FIELD)) {
                String scriptType = "js";
                if (mongoSettings.containsKey(SCRIPT_TYPE_FIELD)) {
                    scriptType = mongoSettings.get(SCRIPT_TYPE_FIELD).toString();
                }
                this.script = scriptService.executable(scriptType, mongoSettings.get(SCRIPT_FIELD).toString(), (Map)Maps.newHashMap());
            } else {
                this.script = null;
            }
        } else {
            String mongoHost = DEFAULT_DB_HOST;
            int mongoPort = 27017;
            try {
                this.mongoServers.add(new ServerAddress(mongoHost, mongoPort));
            }
            catch (UnknownHostException e) {
                e.printStackTrace();
            }
            this.mongoSecondaryReadPreference = false;
            this.mongoDb = riverName.name();
            this.mongoCollection = riverName.name();
            this.mongoFilter = "";
            this.mongoGridFS = false;
            this.mongoAdminUser = "";
            this.mongoAdminPassword = "";
            this.mongoLocalUser = "";
            this.mongoLocalPassword = "";
            this.script = null;
        }
        this.mongoOplogNamespace = this.mongoDb + "." + this.mongoCollection;
        if (settings.settings().containsKey(INDEX_OBJECT)) {
            Map indexSettings = (Map)settings.settings().get(INDEX_OBJECT);
            this.indexName = XContentMapValues.nodeStringValue(indexSettings.get(NAME_FIELD), (String)this.mongoDb);
            this.typeName = XContentMapValues.nodeStringValue(indexSettings.get(TYPE_FIELD), (String)this.mongoDb);
            this.bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get(BULK_SIZE_FIELD), (int)100);
            this.bulkTimeout = indexSettings.containsKey(BULK_TIMEOUT_FIELD) ? TimeValue.parseTimeValue((String)XContentMapValues.nodeStringValue(indexSettings.get(BULK_TIMEOUT_FIELD), (String)"10ms"), (TimeValue)TimeValue.timeValueMillis((long)10L)) : TimeValue.timeValueMillis((long)10L);
            this.throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), (int)(this.bulkSize * 5));
        } else {
            this.indexName = this.mongoDb;
            this.typeName = this.mongoDb;
            this.bulkSize = 100;
            this.bulkTimeout = TimeValue.timeValueMillis((long)10L);
            this.throttleSize = this.bulkSize * 5;
        }
        this.stream = this.throttleSize == -1 ? new LinkedTransferQueue() : new ArrayBlockingQueue<Map<String, Object>>(this.throttleSize);
    }

    public void start() {
        block11: {
            for (ServerAddress server : this.mongoServers) {
                this.logger.info("Using mongodb server(s): host [{}], port [{}]", new Object[]{server.getHost(), server.getPort()});
            }
            this.logger.info("starting mongodb stream. options: secondaryreadpreference [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]", new Object[]{this.mongoSecondaryReadPreference, this.throttleSize, this.mongoGridFS, this.mongoFilter, this.mongoDb, this.mongoCollection, this.script, this.indexName, this.typeName});
            try {
                this.client.admin().indices().prepareCreate(this.indexName).execute().actionGet();
            }
            catch (Exception e) {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof IndexAlreadyExistsException || ExceptionsHelper.unwrapCause((Throwable)e) instanceof ClusterBlockException) break block11;
                this.logger.warn("failed to create index [{}], disabling river...", (Throwable)e, new Object[]{this.indexName});
                return;
            }
        }
        if (this.mongoGridFS) {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Set explicit attachment mapping.", new Object[0]);
                }
                this.client.admin().indices().preparePutMapping(new String[]{this.indexName}).setType(this.typeName).setSource(this.getGridFSMapping()).execute().actionGet();
            }
            catch (Exception e) {
                this.logger.warn("Failed to set explicit mapping (attachment): {}", (Throwable)e, new Object[0]);
            }
        }
        if (this.isMongos()) {
            DBCursor cursor = this.getConfigDb().getCollection("shards").find();
            while (cursor.hasNext()) {
                DBObject item = cursor.next();
                this.logger.info(item.toString(), new Object[0]);
                List<ServerAddress> servers = this.getServerAddressForReplica(item);
                if (servers == null) continue;
                String replicaName = item.get(MONGODB_ID_FIELD).toString();
                Thread tailerThread = EsExecutors.daemonThreadFactory((Settings)this.settings.globalSettings(), (String)("mongodb_river_slurper-" + replicaName)).newThread(new Slurper(servers));
                this.tailerThreads.add(tailerThread);
            }
        } else {
            Thread tailerThread = EsExecutors.daemonThreadFactory((Settings)this.settings.globalSettings(), (String)"mongodb_river_slurper").newThread(new Slurper(this.mongoServers));
            this.tailerThreads.add(tailerThread);
        }
        for (Thread thread : this.tailerThreads) {
            thread.start();
        }
        this.indexerThread = EsExecutors.daemonThreadFactory((Settings)this.settings.globalSettings(), (String)"mongodb_river_indexer").newThread(new Indexer());
        this.indexerThread.start();
    }

    private boolean isMongos() {
        DB adminDb = this.getAdminDb();
        if (adminDb == null) {
            return false;
        }
        CommandResult cr = adminDb.command((DBObject)new BasicDBObject("serverStatus", (Object)1));
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("serverStatus: {}", new Object[]{cr});
            this.logger.trace("process: {}", new Object[]{cr.get("process")});
        }
        if (cr == null || cr.get("process") == null) {
            this.logger.warn("serverStatus return null.", new Object[0]);
            return false;
        }
        return cr.get("process").equals("mongos");
    }

    private DB getAdminDb() {
        if (this.adminDb == null) {
            this.adminDb = this.getMongoClient().getDB("admin");
            if (!(this.mongoAdminUser.isEmpty() || this.mongoAdminPassword.isEmpty() || this.adminDb.isAuthenticated())) {
                this.logger.info("Authenticate {} with {}", new Object[]{"admin", this.mongoAdminUser});
                try {
                    CommandResult cmd = this.adminDb.authenticateCommand(this.mongoAdminUser, this.mongoAdminPassword.toCharArray());
                    if (!cmd.ok()) {
                        this.logger.error("Autenticatication failed for {}: {}", new Object[]{"admin", cmd.getErrorMessage()});
                    }
                }
                catch (MongoException mEx) {
                    this.logger.warn("getAdminDb() failed", (Throwable)mEx, new Object[0]);
                }
            }
        }
        return this.adminDb;
    }

    private DB getConfigDb() {
        DB configDb = this.getMongoClient().getDB(DB_CONFIG);
        if (!this.mongoAdminUser.isEmpty() && !this.mongoAdminUser.isEmpty() && this.getAdminDb().isAuthenticated()) {
            configDb = this.getAdminDb().getMongo().getDB(DB_CONFIG);
        }
        return configDb;
    }

    private Mongo getMongoClient() {
        if (this.mongo == null) {
            this.mongo = new MongoClient(this.mongoServers);
        }
        return this.mongo;
    }

    private List<ServerAddress> getServerAddressForReplica(DBObject item) {
        String definition = item.get(HOST_FIELD).toString();
        if (definition.contains("/")) {
            definition = definition.substring(definition.indexOf("/") + 1);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("getServerAddressForReplica - definition: {}", new Object[]{definition});
        }
        ArrayList<ServerAddress> servers = new ArrayList<ServerAddress>();
        for (String server : definition.split(",")) {
            try {
                servers.add(new ServerAddress(server));
            }
            catch (UnknownHostException uhEx) {
                this.logger.warn("failed to execute bulk", (Throwable)uhEx, new Object[0]);
            }
        }
        return servers;
    }

    public void close() {
        if (this.active) {
            this.logger.info("closing mongodb stream river", new Object[0]);
            this.active = false;
            for (Thread thread : this.tailerThreads) {
                thread.interrupt();
            }
            this.indexerThread.interrupt();
        }
    }

    private XContentBuilder getGridFSMapping() throws IOException {
        XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(this.typeName).startObject("properties").startObject("content").field(TYPE_FIELD, "attachment").endObject().startObject("filename").field(TYPE_FIELD, "string").endObject().startObject("contentType").field(TYPE_FIELD, "string").endObject().startObject("md5").field(TYPE_FIELD, "string").endObject().startObject("length").field(TYPE_FIELD, "long").endObject().startObject("chunkSize").field(TYPE_FIELD, "long").endObject().endObject().endObject().endObject();
        this.logger.info("Mapping: {}", new Object[]{mapping.string()});
        return mapping;
    }

    private BSONTimestamp getLastTimestamp(String namespace) {
        String lastTimestamp;
        Map mongodbState;
        GetResponse lastTimestampResponse = (GetResponse)this.client.prepareGet(this.riverIndexName, this.riverName.getName(), namespace).execute().actionGet();
        if (lastTimestampResponse.isExists() && (mongodbState = (Map)lastTimestampResponse.getSourceAsMap().get("mongodb")) != null && (lastTimestamp = mongodbState.get(LAST_TIMESTAMP_FIELD).toString()) != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("{} last timestamp: {}", new Object[]{namespace, lastTimestamp});
            }
            return (BSONTimestamp)JSON.parse((String)lastTimestamp);
        }
        return null;
    }

    private void updateLastTimestamp(String namespace, BSONTimestamp time, BulkRequestBuilder bulk) {
        try {
            bulk.add(Requests.indexRequest((String)this.riverIndexName).type(this.riverName.getName()).id(namespace).source(XContentFactory.jsonBuilder().startObject().startObject("mongodb").field(LAST_TIMESTAMP_FIELD, JSON.serialize((Object)time)).endObject().endObject()));
        }
        catch (IOException e) {
            this.logger.error("error updating last timestamp for namespace {}", new Object[]{namespace});
        }
    }

    private class Slurper
    implements Runnable {
        private Mongo mongo;
        private DB slurpedDb;
        private DBCollection slurpedCollection;
        private DB oplogDb;
        private DBCollection oplogCollection;
        private final List<ServerAddress> mongoServers;

        public Slurper(List<ServerAddress> mongoServers) {
            this.mongoServers = mongoServers;
        }

        private boolean assignCollections() {
            Set collections;
            CommandResult cmd;
            DB adminDb = this.mongo.getDB("admin");
            this.oplogDb = this.mongo.getDB("local");
            if (!MongoDBRiver.this.mongoAdminUser.isEmpty() && !MongoDBRiver.this.mongoAdminPassword.isEmpty()) {
                MongoDBRiver.this.logger.info("Authenticate {} with {}", new Object[]{"admin", MongoDBRiver.this.mongoAdminUser});
                cmd = adminDb.authenticateCommand(MongoDBRiver.this.mongoAdminUser, MongoDBRiver.this.mongoAdminPassword.toCharArray());
                if (!cmd.ok()) {
                    MongoDBRiver.this.logger.error("Autenticatication failed for {}: {}", new Object[]{"admin", cmd.getErrorMessage()});
                }
                this.oplogDb = adminDb.getMongo().getDB("local");
            }
            if (!(MongoDBRiver.this.mongoLocalUser.isEmpty() || MongoDBRiver.this.mongoLocalPassword.isEmpty() || this.oplogDb.isAuthenticated())) {
                MongoDBRiver.this.logger.info("Authenticate {} with {}", new Object[]{"local", MongoDBRiver.this.mongoLocalUser});
                cmd = this.oplogDb.authenticateCommand(MongoDBRiver.this.mongoLocalUser, MongoDBRiver.this.mongoLocalPassword.toCharArray());
                if (!cmd.ok()) {
                    MongoDBRiver.this.logger.error("Autenticatication failed for {}: {}", new Object[]{"local", cmd.getErrorMessage()});
                    return false;
                }
            }
            if (!(collections = this.oplogDb.getCollectionNames()).contains(MongoDBRiver.OPLOG_COLLECTION)) {
                MongoDBRiver.this.logger.error("Cannot find oplog.rs collection. Please use check this link: http://goo.gl/2x5IW", new Object[0]);
                return false;
            }
            this.oplogCollection = this.oplogDb.getCollection(MongoDBRiver.OPLOG_COLLECTION);
            this.slurpedDb = this.mongo.getDB(MongoDBRiver.this.mongoDb);
            if (!MongoDBRiver.this.mongoAdminUser.isEmpty() && !MongoDBRiver.this.mongoAdminUser.isEmpty() && adminDb.isAuthenticated()) {
                this.slurpedDb = adminDb.getMongo().getDB(MongoDBRiver.this.mongoDb);
            }
            this.slurpedCollection = this.slurpedDb.getCollection(MongoDBRiver.this.mongoCollection);
            return true;
        }

        @Override
        public void run() {
            this.mongo = new MongoClient(this.mongoServers);
            if (MongoDBRiver.this.mongoSecondaryReadPreference) {
                this.mongo.setReadPreference(ReadPreference.secondaryPreferred());
            }
            while (MongoDBRiver.this.active) {
                try {
                    if (!this.assignCollections()) break;
                    DBCursor oplogCursor = this.oplogCursor(null);
                    if (oplogCursor == null) {
                        oplogCursor = this.processFullCollection();
                    }
                    while (oplogCursor.hasNext()) {
                        DBObject item = oplogCursor.next();
                        this.processOplogEntry(item);
                    }
                    Thread.sleep(500L);
                }
                catch (MongoInterruptedException mIEx) {
                    MongoDBRiver.this.logger.error("Mongo driver has been interrupted", (Throwable)mIEx, new Object[0]);
                    MongoDBRiver.this.active = false;
                }
                catch (MongoException mEx) {
                    MongoDBRiver.this.logger.error("Mongo gave an exception", (Throwable)mEx, new Object[0]);
                }
                catch (NoSuchElementException nEx) {
                    MongoDBRiver.this.logger.warn("A mongoDB cursor bug ?", (Throwable)nEx, new Object[0]);
                }
                catch (InterruptedException e) {
                    if (MongoDBRiver.this.logger.isDebugEnabled()) {
                        MongoDBRiver.this.logger.debug("river-mongodb slurper interrupted", new Object[0]);
                    }
                    Thread.currentThread().interrupt();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private DBCursor processFullCollection() throws InterruptedException {
            BSONTimestamp currentTimestamp = (BSONTimestamp)this.oplogCollection.find().sort((DBObject)new BasicDBObject(MongoDBRiver.OPLOG_TIMESTAMP, (Object)-1)).limit(1).next().get(MongoDBRiver.OPLOG_TIMESTAMP);
            this.addQueryToStream(MongoDBRiver.OPLOG_INSERT_OPERATION, currentTimestamp, null);
            DBCursor dBCursor = this.oplogCursor(currentTimestamp);
            return dBCursor;
        }

        private void processOplogEntry(DBObject entry) throws InterruptedException {
            String operation = entry.get(MongoDBRiver.OPLOG_OPERATION).toString();
            String namespace = entry.get(MongoDBRiver.OPLOG_NAMESPACE).toString();
            BSONTimestamp oplogTimestamp = (BSONTimestamp)entry.get(MongoDBRiver.OPLOG_TIMESTAMP);
            DBObject object = (DBObject)entry.get(MongoDBRiver.OPLOG_OBJECT);
            if (entry.containsField("fromMigrate") && ((BasicBSONObject)entry).getBoolean("fromMigrate")) {
                MongoDBRiver.this.logger.debug("From migration or sharding operation. Can be ignored. {}", new Object[]{entry});
                return;
            }
            if (namespace.endsWith(MongoDBRiver.GRIDFS_CHUNKS_SUFFIX)) {
                return;
            }
            if (MongoDBRiver.this.logger.isTraceEnabled()) {
                MongoDBRiver.this.logger.trace("oplog entry - namespace [{}], operation [{}]", new Object[]{namespace, operation});
                MongoDBRiver.this.logger.trace("oplog processing item {}", new Object[]{entry});
            }
            if (MongoDBRiver.this.mongoGridFS && namespace.endsWith(MongoDBRiver.GRIDFS_FILES_SUFFIX) && (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(operation) || MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(operation))) {
                String objectId = object.get(MongoDBRiver.MONGODB_ID_FIELD).toString();
                GridFS grid = new GridFS(this.mongo.getDB(MongoDBRiver.this.mongoDb), MongoDBRiver.this.mongoCollection);
                GridFSDBFile file = grid.findOne(new ObjectId(objectId));
                if (file != null) {
                    MongoDBRiver.this.logger.info("Caught file: {} - {}", new Object[]{file.getId(), file.getFilename()});
                    object = file;
                } else {
                    MongoDBRiver.this.logger.warn("Cannot find file from id: {}", new Object[]{objectId});
                }
            }
            if (object instanceof GridFSDBFile) {
                MongoDBRiver.this.logger.info("Add attachment: {}", new Object[]{object.get(MongoDBRiver.MONGODB_ID_FIELD)});
                HashMap<String, Object> data = new HashMap<String, Object>();
                data.put(MongoDBRiver.IS_MONGODB_ATTACHMENT, true);
                data.put(MongoDBRiver.MONGODB_ATTACHMENT, object);
                data.put(MongoDBRiver.MONGODB_ID_FIELD, object.get(MongoDBRiver.MONGODB_ID_FIELD));
                this.addToStream(operation, oplogTimestamp, data);
            } else if (MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(operation)) {
                DBObject update = (DBObject)entry.get(MongoDBRiver.OPLOG_UPDATE);
                MongoDBRiver.this.logger.debug("Updated item: {}", new Object[]{update});
                this.addQueryToStream(operation, oplogTimestamp, update);
            } else {
                this.addToStream(operation, oplogTimestamp, object.toMap());
            }
        }

        private DBObject getIndexFilter(BSONTimestamp timestampOverride) {
            BSONTimestamp time = timestampOverride == null ? MongoDBRiver.this.getLastTimestamp(MongoDBRiver.this.mongoOplogNamespace) : timestampOverride;
            BasicDBObject filter = new BasicDBObject();
            ArrayList<Object> values = new ArrayList<Object>();
            if (MongoDBRiver.this.mongoGridFS) {
                values.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, (Object)(MongoDBRiver.this.mongoOplogNamespace + MongoDBRiver.GRIDFS_FILES_SUFFIX)));
            } else {
                values.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, (Object)MongoDBRiver.this.mongoOplogNamespace));
            }
            if (!MongoDBRiver.this.mongoFilter.isEmpty()) {
                values.add(this.getMongoFilter());
            }
            if (time == null) {
                MongoDBRiver.this.logger.info("No known previous slurping time for this collection", new Object[0]);
            } else {
                values.add(new BasicDBObject(MongoDBRiver.OPLOG_TIMESTAMP, (Object)new BasicDBObject("$gt", (Object)time)));
            }
            filter = new BasicDBObject("$and", values);
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("Using filter: {}", new Object[]{filter});
            }
            return filter;
        }

        private DBObject getMongoFilter() {
            ArrayList<BasicDBObject> filters = new ArrayList<BasicDBObject>();
            ArrayList<Object> filters2 = new ArrayList<Object>();
            ArrayList<BasicDBObject> filters3 = new ArrayList<BasicDBObject>();
            filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, (Object)MongoDBRiver.OPLOG_DELETE_OPERATION));
            filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, (Object)MongoDBRiver.OPLOG_UPDATE_OPERATION));
            filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, (Object)MongoDBRiver.OPLOG_INSERT_OPERATION));
            filters2.add(new BasicDBObject("$or", filters3));
            filters2.add((DBObject)JSON.parse((String)MongoDBRiver.this.mongoFilter));
            filters.add(new BasicDBObject("$and", filters2));
            return new BasicDBObject("$or", filters);
        }

        private DBCursor oplogCursor(BSONTimestamp timestampOverride) {
            DBObject indexFilter = this.getIndexFilter(timestampOverride);
            if (indexFilter == null) {
                return null;
            }
            return this.oplogCollection.find(indexFilter).sort((DBObject)new BasicDBObject("$natural", (Object)1)).addOption(2).addOption(32);
        }

        private void addQueryToStream(String operation, BSONTimestamp currentTimestamp, DBObject update) throws InterruptedException {
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("addQueryToStream - operation [{}], currentTimestamp [{}], update [{}]", new Object[]{operation, currentTimestamp, update});
            }
            for (DBObject item : this.slurpedCollection.find(update)) {
                this.addToStream(operation, currentTimestamp, item.toMap());
            }
        }

        private void addToStream(String operation, BSONTimestamp currentTimestamp, Map<String, Object> data) throws InterruptedException {
            if (MongoDBRiver.this.logger.isDebugEnabled()) {
                MongoDBRiver.this.logger.debug("addToStream - operation [{}], currentTimestamp [{}], data [{}]", new Object[]{operation, currentTimestamp, data});
            }
            data.put(MongoDBRiver.OPLOG_TIMESTAMP, currentTimestamp);
            data.put(MongoDBRiver.OPLOG_OPERATION, operation);
            MongoDBRiver.this.stream.put(data);
        }
    }

    private class Indexer
    implements Runnable {
        private final ESLogger logger = ESLoggerFactory.getLogger((String)this.getClass().getName());
        private int deletedDocuments = 0;
        private int insertedDocuments = 0;
        private int updatedDocuments = 0;
        private StopWatch sw;

        private Indexer() {
        }

        @Override
        public void run() {
            while (MongoDBRiver.this.active) {
                this.sw = new StopWatch().start();
                this.deletedDocuments = 0;
                this.insertedDocuments = 0;
                this.updatedDocuments = 0;
                try {
                    BSONTimestamp lastTimestamp = null;
                    BulkRequestBuilder bulk = MongoDBRiver.this.client.prepareBulk();
                    Map data = (Map)MongoDBRiver.this.stream.take();
                    lastTimestamp = this.updateBulkRequest(bulk, data);
                    while ((data = (Map)MongoDBRiver.this.stream.poll(MongoDBRiver.this.bulkTimeout.millis(), TimeUnit.MILLISECONDS)) != null) {
                        lastTimestamp = this.updateBulkRequest(bulk, data);
                        if (bulk.numberOfActions() < MongoDBRiver.this.bulkSize) continue;
                    }
                    if (lastTimestamp != null) {
                        MongoDBRiver.this.updateLastTimestamp(MongoDBRiver.this.mongoOplogNamespace, lastTimestamp, bulk);
                    }
                    try {
                        BulkResponse response = (BulkResponse)bulk.execute().actionGet();
                        if (response.hasFailures()) {
                            this.logger.warn("failed to execute" + response.buildFailureMessage(), new Object[0]);
                        }
                    }
                    catch (ElasticSearchInterruptedException esie) {
                        this.logger.warn("river-mongodb indexer bas been interrupted", (Throwable)esie, new Object[0]);
                        Thread.currentThread().interrupt();
                    }
                    catch (Exception e) {
                        this.logger.warn("failed to execute bulk", (Throwable)e, new Object[0]);
                    }
                }
                catch (InterruptedException e) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("river-mongodb indexer interrupted", new Object[0]);
                    }
                    Thread.currentThread().interrupt();
                }
                this.logStatistics();
            }
        }

        private BSONTimestamp updateBulkRequest(BulkRequestBuilder bulk, Map<String, Object> data) {
            if (data.get(MongoDBRiver.MONGODB_ID_FIELD) == null) {
                this.logger.warn("Cannot get object id. Skip the current item: [{}]", new Object[]{data});
                return null;
            }
            BSONTimestamp lastTimestamp = (BSONTimestamp)data.get(MongoDBRiver.OPLOG_TIMESTAMP);
            String operation = data.get(MongoDBRiver.OPLOG_OPERATION).toString();
            String objectId = data.get(MongoDBRiver.MONGODB_ID_FIELD).toString();
            data.remove(MongoDBRiver.OPLOG_TIMESTAMP);
            data.remove(MongoDBRiver.OPLOG_OPERATION);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("updateBulkRequest for id: [{}], operation: [{}]", new Object[]{objectId, operation});
            }
            Map ctx = null;
            try {
                ctx = XContentFactory.xContent((XContentType)XContentType.JSON).createParser("{}").mapAndClose();
            }
            catch (IOException e) {
                this.logger.warn("failed to parse {}", (Throwable)e, new Object[0]);
            }
            if (MongoDBRiver.this.script != null && ctx != null) {
                ctx.put("document", data);
                ctx.put("operation", operation);
                ctx.put("id", objectId);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Context before script executed: {}", new Object[]{ctx});
                }
                MongoDBRiver.this.script.setNextVar("ctx", (Object)ctx);
                try {
                    MongoDBRiver.this.script.run();
                    ctx = (Map)MongoDBRiver.this.script.unwrap((Object)ctx);
                }
                catch (Exception e) {
                    this.logger.warn("failed to script process {}, ignoring", (Throwable)e, new Object[]{ctx});
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Context after script executed: {}", new Object[]{ctx});
                }
                if (ctx.containsKey("ignore") && ctx.get("ignore").equals(Boolean.TRUE)) {
                    this.logger.debug("From script ignore document id: {}", new Object[]{objectId});
                    return lastTimestamp;
                }
                if (ctx.containsKey("deleted") && ctx.get("deleted").equals(Boolean.TRUE)) {
                    ctx.put("operation", MongoDBRiver.OPLOG_DELETE_OPERATION);
                }
                if (ctx.containsKey("document")) {
                    data = (Map)ctx.get("document");
                    this.logger.debug("From script document: {}", new Object[]{data});
                }
                if (ctx.containsKey("operation")) {
                    operation = ctx.get("operation").toString();
                    this.logger.debug("From script operation: {}", new Object[]{operation});
                }
            }
            try {
                String index = this.extractIndex(ctx);
                String type = this.extractType(ctx);
                String parent = this.extractParent(ctx);
                String routing = this.extractRouting(ctx);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Operation: {} - index: {} - type: {} - routing: {} - parent: {}", new Object[]{operation, index, type, routing, parent});
                }
                if (MongoDBRiver.OPLOG_INSERT_OPERATION.equals(operation)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Insert operation - id: {} - contains attachment: {}", new Object[]{operation, objectId, data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)});
                    }
                    bulk.add(Requests.indexRequest((String)index).type(type).id(objectId).source(this.build(data, objectId)).routing(routing).parent(parent));
                    ++this.insertedDocuments;
                }
                if (MongoDBRiver.OPLOG_UPDATE_OPERATION.equals(operation)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Update operation - id: {} - contains attachment: {}", new Object[]{objectId, data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)});
                    }
                    bulk.add(new DeleteRequest(index, type, objectId).routing(routing).parent(parent));
                    bulk.add(Requests.indexRequest((String)index).type(type).id(objectId).source(this.build(data, objectId)).routing(routing).parent(parent));
                    ++this.updatedDocuments;
                }
                if (MongoDBRiver.OPLOG_DELETE_OPERATION.equals(operation)) {
                    this.logger.info("Delete request [{}], [{}], [{}]", new Object[]{index, type, objectId});
                    bulk.add(new DeleteRequest(index, type, objectId).routing(routing).parent(parent));
                    ++this.deletedDocuments;
                }
            }
            catch (IOException e) {
                this.logger.warn("failed to parse {}", (Throwable)e, new Object[]{data});
            }
            return lastTimestamp;
        }

        private XContentBuilder build(Map<String, Object> data, String objectId) throws IOException {
            if (data.containsKey(MongoDBRiver.IS_MONGODB_ATTACHMENT)) {
                this.logger.info("Add Attachment: {} to index {} / type {}", new Object[]{objectId, MongoDBRiver.this.indexName, MongoDBRiver.this.typeName});
                return GridFSHelper.serialize((GridFSDBFile)data.get(MongoDBRiver.MONGODB_ATTACHMENT));
            }
            return XContentFactory.jsonBuilder().map(data);
        }

        private String extractParent(Map<String, Object> ctx) {
            return (String)ctx.get("_parent");
        }

        private String extractRouting(Map<String, Object> ctx) {
            return (String)ctx.get("_routing");
        }

        private String extractType(Map<String, Object> ctx) {
            String type = (String)ctx.get("_type");
            if (type == null) {
                type = MongoDBRiver.this.typeName;
            }
            return type;
        }

        private String extractIndex(Map<String, Object> ctx) {
            String index = (String)ctx.get("_index");
            if (index == null) {
                index = MongoDBRiver.this.indexName;
            }
            return index;
        }

        private void logStatistics() {
            long totalDocuments = this.deletedDocuments + this.insertedDocuments;
            long totalTimeInSeconds = this.sw.stop().totalTime().seconds();
            long totalDocumentsPerSecond = totalTimeInSeconds == 0L ? totalDocuments : totalDocuments / totalTimeInSeconds;
            this.logger.info("Indexed {} documents, {} insertions {}, updates, {} deletions, {} documents per second", new Object[]{totalDocuments, this.insertedDocuments, this.updatedDocuments, this.deletedDocuments, totalDocumentsPerSecond});
        }
    }
}

