/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services.rlserver.mongodb;

import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.util.concurrent.atomic.AtomicInteger;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.services.rlserver.mongodb.MongoUtil;
import org.nustaq.kontraktor.services.rlserver.mongodb.SubscriberHelpers;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.server.storage.RecordPersistance;
import org.nustaq.reallive.server.storage.StorageStats;
import org.reactivestreams.Subscriber;

public class MongoPersistance
implements RecordPersistance {
    private final TableDescription description;
    MongoCollection collection;
    ReplaceOptions upsert = new ReplaceOptions().upsert(true);

    public MongoPersistance(MongoCollection col, TableDescription description) {
        this.collection = col;
        this.description = description;
    }

    public Record remove(String key) {
        this.collection.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId(key)));
        return null;
    }

    public StorageStats getStats() {
        return new StorageStats().name("mongo:" + this.description.getName());
    }

    public <T> void forEachWithSpore(final Spore<Record, T> spore) {
        final AtomicInteger count = new AtomicInteger();
        final long now = System.currentTimeMillis();
        this.collection.find().subscribe((Subscriber)new SubscriberHelpers.OperationSubscriber(){

            @Override
            public void onNext(Object o) {
                super.onNext(o);
                count.incrementAndGet();
                try {
                    spore.remote((Object)MongoUtil.get().toRecord((Document)o));
                }
                catch (Throwable ex) {
                    Log.Warn((Object)this, (Throwable)ex, (String)("exception in spore " + spore));
                    throw ex;
                }
            }

            @Override
            public void onError(Throwable throwable) {
                spore.complete(null, (Object)throwable);
            }

            @Override
            public void onComplete() {
                Log.Info((Object)this, (String)(MongoPersistance.this.description + " count forEach " + count + " time:" + (System.currentTimeMillis() - now)));
                spore.finish();
            }
        });
    }

    public RecordPersistance put(String key, Record record) {
        record.internal_setLastModified(System.currentTimeMillis());
        return this._put(key, record);
    }

    public RecordPersistance _put(String key, Record record) {
        record.key(key);
        Document replacement = MongoUtil.get().fromRecord(record);
        this.collection.replaceOne(Filters.eq((String)"key", (Object)key), (Object)replacement, this.upsert).subscribe((Subscriber)new SubscriberHelpers.OperationSubscriber(){

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
            }
        });
        return this;
    }
}

