/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.KeySetSubscriber;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.RecordStorage;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.impl.FilterProcessor;
import org.nustaq.reallive.impl.FilterSpore;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.StorageDriver;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.records.RecordWrapper;

public class RealLiveTableActor
extends Actor<RealLiveTableActor>
implements RealLiveTable {
    public static int MAX_QUERY_BATCH_SIZE = 10;
    public static boolean DUMP_QUERY_TIME = false;
    StorageDriver storageDriver;
    FilterProcessor filterProcessor;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap();
    TableDescription description;
    ArrayList<QueryQEntry> queuedSpores = new ArrayList();
    int taCount = 0;

    @Local
    public IPromise init(Supplier<RecordStorage> storeFactory, TableDescription desc) {
        this.description = desc;
        Thread.currentThread().setName("Table " + (desc == null ? "NULL" : desc.getName()) + " main");
        RecordStorage store = storeFactory.get();
        this.storageDriver = new StorageDriver(store);
        this.filterProcessor = new FilterProcessor(this);
        this.storageDriver.setListener(this.filterProcessor);
        return RealLiveTableActor.resolve();
    }

    @Override
    public void receive(ChangeMessage change) {
        this.checkThread();
        try {
            this.storageDriver.receive(change);
        }
        catch (Exception th) {
            Log.Error((Object)this, (Throwable)th);
        }
    }

    public <T> void forEachDirect(Spore<Record, T> spore) {
        this.checkThread();
        try {
            this.storageDriver.getStore().forEachWithSpore(spore);
        }
        catch (Exception ex) {
            spore.complete(null, (Object)ex);
        }
    }

    @Override
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        this.forEachQueued(spore, () -> {});
    }

    protected void hasStopped() {
    }

    @Override
    @CallerSideMethod
    public void subscribe(Subscriber subs) {
        Callback & Serializable callback = (Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                subs.getReceiver().receive((ChangeMessage)r);
            }
        };
        this._subscribe(subs.getFilter(), callback, subs.getId());
    }

    public void _subscribe(RLPredicate pred, Callback cb, int id) {
        this.checkThread();
        Subscriber localSubs = new Subscriber(pred, change -> cb.pipe((Object)change)).serverSideCB(cb);
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        this.receiverSideSubsMap.put(sid, localSubs);
        FilterSpore spore = new FilterSpore(localSubs.getFilter()).modifiesResult(false);
        spore.onFinish(() -> localSubs.getReceiver().receive(RLUtil.get().done()));
        spore.setForEach((Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                localSubs.getReceiver().receive(new AddMessage(0, (Record)r));
            } else {
                localSubs.getReceiver().receive(RLUtil.get().done());
            }
        });
        if (pred instanceof KeySetSubscriber.KSPredicate) {
            KeySetSubscriber.KSPredicate p = (KeySetSubscriber.KSPredicate)pred;
            p.getKeys().forEach(key -> {
                Record record = this.storageDriver.getStore().get((String)key);
                if (record != null) {
                    localSubs.getReceiver().receive(new AddMessage(0, record));
                }
            });
            localSubs.getReceiver().receive(RLUtil.get().done());
            this.filterProcessor.startListening(localSubs);
        } else {
            if (pred instanceof RLNoQueryPredicate) {
                localSubs.getReceiver().receive(RLUtil.get().done());
            } else {
                this.forEachDirect(spore);
            }
            this.filterProcessor.startListening(localSubs);
        }
    }

    private void forEachQueued(Spore s, Runnable r) {
        this.queuedSpores.add(new QueryQEntry(s, r));
        ((RealLiveTableActor)this.self())._execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
    }

    public void _execQueriesOrDelay(int size, int taCount) {
        if (this.queuedSpores.size() == size && this.taCount == taCount || this.queuedSpores.size() > MAX_QUERY_BATCH_SIZE) {
            long tim = System.currentTimeMillis();
            Consumer<Record> recordConsumer = rec -> {
                for (int i = 0; i < this.queuedSpores.size(); ++i) {
                    QueryQEntry qqentry = this.queuedSpores.get(i);
                    Spore spore = qqentry.spore;
                    if (spore.isFinished()) continue;
                    try {
                        spore.remote(rec);
                        continue;
                    }
                    catch (Throwable ex) {
                        Log.Warn((Object)this, (Throwable)ex, (String)("exception in spore " + spore));
                        spore.complete(null, (Object)ex);
                    }
                }
            };
            this.storageDriver.getStore().stream().forEach(recordConsumer);
            this.queuedSpores.forEach(qqentry -> {
                qqentry.spore.finish();
                qqentry.onFin.run();
            });
            if (DUMP_QUERY_TIME) {
                System.out.println("tim for " + this.queuedSpores.size() + " " + (System.currentTimeMillis() - tim));
            }
            this.queuedSpores.clear();
            return;
        }
        this._execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
    }

    protected String addChannelIdIfPresent(Callback cb, String sid) {
        if (cb instanceof CallbackWrapper && ((CallbackWrapper)cb).isRemote()) {
            CallbackRefSerializer.MyRemotedCallback realCallback = (CallbackRefSerializer.MyRemotedCallback)((CallbackWrapper)cb).getRealCallback();
            sid = sid + "#" + realCallback.getChanId();
        }
        return sid;
    }

    @Override
    @CallerSideMethod
    public void unsubscribe(Subscriber subs) {
        this._unsubscribe((Callback & Serializable)(r, e) -> {}, subs.getId());
    }

    public void _unsubscribe(Callback cb, int id) {
        this.checkThread();
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        Subscriber subs = this.receiverSideSubsMap.get(sid);
        this.filterProcessor.unsubscribe(subs);
        this.receiverSideSubsMap.remove(sid);
        cb.finish();
        subs.getServerSideCB().finish();
    }

    @Override
    public IPromise<Record> get(String key) {
        ++this.taCount;
        return RealLiveTableActor.resolve((Object)this.storageDriver.getStore().get(key));
    }

    @Override
    public IPromise<Long> size() {
        return RealLiveTableActor.resolve((Object)this.storageDriver.getStore().size());
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return RealLiveTableActor.resolve((Object)this.description);
    }

    @Override
    public IPromise<StorageStats> getStats() {
        try {
            StorageStats stats = this.storageDriver.getStore().getStats();
            return RealLiveTableActor.resolve((Object)stats);
        }
        catch (Throwable th) {
            Log.Warn((Object)this, (Throwable)th);
            return RealLiveTableActor.reject((Object)th.getMessage());
        }
    }

    @Override
    public IPromise atomic(String key, RLFunction<Record, Object> action) {
        ++this.taCount;
        return this.storageDriver.atomicQuery(key, action);
    }

    @Override
    public void atomicUpdate(RLPredicate<Record> filter, RLFunction<Record, Boolean> action) {
        ++this.taCount;
        this.storageDriver.atomicUpdate(filter, action);
    }

    @Override
    public IPromise resizeIfLoadFactorLarger(double loadFactor, long maxGrowBytes) {
        Log.Info((Object)this, (String)("resizing table if lf: " + loadFactor + " maxgrow:" + maxGrowBytes));
        long now = System.currentTimeMillis();
        this.storageDriver.resizeIfLoadFactorLarger(loadFactor, maxGrowBytes);
        Log.Info((Object)this, (String)("resizing duration" + (System.currentTimeMillis() - now)));
        return RealLiveTableActor.resolve();
    }

    @Override
    public void put(int senderId, String key, Object ... keyVals) {
        this.receive(RLUtil.get().put(senderId, key, keyVals));
    }

    @Override
    public void merge(int senderId, String key, Object ... keyVals) {
        if (key instanceof Record) {
            throw new RuntimeException("probably accidental method resolution fail. Use merge instead");
        }
        this.receive(RLUtil.get().addOrUpdate(senderId, key, keyVals));
    }

    @Override
    public IPromise<Boolean> add(int senderId, String key, Object ... keyVals) {
        if (this.storageDriver.getStore().get(key) != null) {
            return RealLiveTableActor.resolve((Object)false);
        }
        this.receive(RLUtil.get().add(senderId, key, keyVals));
        return RealLiveTableActor.resolve((Object)true);
    }

    @Override
    public IPromise<Boolean> addRecord(int senderId, Record rec) {
        Record existing;
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        if ((existing = this.storageDriver.getStore().get(rec.getKey())) != null) {
            return RealLiveTableActor.resolve((Object)false);
        }
        this.receive(new AddMessage(senderId, rec));
        return RealLiveTableActor.resolve((Object)true);
    }

    @Override
    public void mergeRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.receive(new AddMessage(senderId, true, rec));
    }

    @Override
    public void setRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.receive(new PutMessage(senderId, rec));
    }

    @Override
    public void update(int senderId, String key, Object ... keyVals) {
        this.receive(RLUtil.get().update(senderId, key, keyVals));
    }

    @Override
    public IPromise<Record> take(int senderId, String key) {
        Record record = this.storageDriver.getStore().get(key);
        this.receive(RLUtil.get().remove(senderId, key));
        return RealLiveTableActor.resolve((Object)record);
    }

    @Override
    public void remove(int senderId, String key) {
        RemoveMessage remove = RLUtil.get().remove(senderId, key);
        this.receive(remove);
    }

    static class QueryQEntry {
        Spore spore;
        Runnable onFin;

        public QueryQEntry(Spore spore, Runnable onFin) {
            this.spore = spore;
            this.onFin = onFin;
        }
    }
}

