/*
 * Decompiled with CFR 0.152.
 */
package com.github.diceproject.qt.spout;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.bson.Document;
import org.bson.conversions.Bson;

public class MongoDbSpout
extends BaseRichSpout {
    private static final long serialVersionUID = -7261151120193254079L;
    private String _mongoHost;
    private String _mongoDbName;
    private String _mongoCollectionName;
    private SpoutOutputCollector _collector;
    private MongoClient _mongo = null;
    private MongoDatabase _database = null;
    private Bson _query;
    private LinkedBlockingQueue<Document> _queue;
    private CursorThread _listener = null;
    private MongoCollection<Document> _collection;

    public MongoDbSpout(String mongoHost, String mongoDbName, String mongoCollectionName, Bson query) {
        this._mongoHost = mongoHost;
        this._mongoDbName = mongoDbName;
        this._mongoCollectionName = mongoCollectionName;
        this._query = query;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            this.reset(this._mongoHost, this._mongoDbName, this._mongoCollectionName);
        }
        catch (UnknownHostException e) {
            e.printStackTrace();
        }
        this._collector = collector;
        this._queue = new LinkedBlockingQueue(10000);
        this._mongo = new MongoClient(this._mongoHost);
        this._database = this._mongo.getDatabase(this._mongoDbName);
        this._collection = this._database.getCollection(this._mongoCollectionName);
        this._listener = new CursorThread(this._queue, this._database, this._mongoCollectionName, this._query);
        this._listener.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void nextTuple() {
        Document obj = this._queue.poll();
        if (obj == null) {
            Utils.sleep((long)100L);
        } else {
            SpoutOutputCollector spoutOutputCollector = this._collector;
            synchronized (spoutOutputCollector) {
                this._collector.emit(Utils.tuple((Object[])new Object[]{obj.toString()}));
            }
            this._collection.updateOne(new BasicDBObject("_id", obj.get("_id")), new BasicDBObject("$set", new BasicDBObject("status", "injected")));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(new String[]{this._mongoCollectionName}));
    }

    public void ack(Object url) {
    }

    public void fail(Object url) {
    }

    public void reset(String host, String dbName, String collectionName) throws UnknownHostException {
        MongoClient client = new MongoClient(host);
        MongoDatabase db = client.getDatabase(dbName);
        MongoCollection<Document> collection = db.getCollection(collectionName);
        BasicDBObject q = new BasicDBObject("status", "injected");
        BasicDBObject o = new BasicDBObject("$set", new BasicDBObject("status", "new"));
        collection.updateMany(q, o);
        client.close();
    }

    class CursorThread
    extends Thread {
        LinkedBlockingQueue<Document> queue;
        String mongoCollectionName;
        MongoDatabase mongoDB;
        Bson query;

        public CursorThread(LinkedBlockingQueue<Document> queue, MongoDatabase mongoDB, String mongoCollectionName, Bson query) {
            this.queue = queue;
            this.mongoDB = mongoDB;
            this.mongoCollectionName = mongoCollectionName;
            this.query = query;
        }

        @Override
        public void run() {
            while (true) {
                FindIterable<Document> cursor = this.mongoDB.getCollection(this.mongoCollectionName).find(this.query).sort(new BasicDBObject("_id", (Object)-1)).limit(100);
                for (Document obj : cursor) {
                    if (obj == null) continue;
                    try {
                        this.queue.put(obj);
                    }
                    catch (InterruptedException e) {
                        Utils.sleep((long)100L);
                    }
                }
                System.out.println("Injector: " + MongoDbSpout.this._queue.size());
                Utils.sleep((long)10000L);
            }
        }
    }
}

