/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.server.actors;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.api.ChangeMessage;
import org.nustaq.reallive.api.RLFunction;
import org.nustaq.reallive.api.RLHashIndexPredicate;
import org.nustaq.reallive.api.RLNoQueryPredicate;
import org.nustaq.reallive.api.RLPredicate;
import org.nustaq.reallive.api.RealLiveTable;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.api.RecordStorage;
import org.nustaq.reallive.api.Subscriber;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.api.TableState;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.PutMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.query.EvalContext;
import org.nustaq.reallive.query.QToken;
import org.nustaq.reallive.query.Value;
import org.nustaq.reallive.query.VarPath;
import org.nustaq.reallive.records.RecordWrapper;
import org.nustaq.reallive.server.FilterProcessor;
import org.nustaq.reallive.server.FilterSpore;
import org.nustaq.reallive.server.QueryPredicate;
import org.nustaq.reallive.server.RLUtil;
import org.nustaq.reallive.server.StorageDriver;
import org.nustaq.reallive.server.storage.ClusterTableRecordMapping;
import org.nustaq.reallive.server.storage.HashIndex;
import org.nustaq.reallive.server.storage.IndexedRecordStorage;
import org.nustaq.reallive.server.storage.StorageStats;

public class RealLiveTableActor
extends Actor<RealLiveTableActor>
implements RealLiveTable {
    public static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1L);
    public static boolean DUMP_QUERY_TIME = false;
    public transient String __clientSideTag;
    StorageDriver storageDriver;
    FilterProcessor filterProcessor;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap();
    TableDescription description;
    IndexedRecordStorage indexedStorage = new IndexedRecordStorage();
    ArrayList<QueryQEntry> queuedSpores = new ArrayList();
    ClusterTableRecordMapping mapping;
    int taCount = 0;
    long lastReportTime = System.currentTimeMillis();

    @Local
    public IPromise init(Function<TableDescription, RecordStorage> storeFactory, TableDescription desc) {
        this.description = desc;
        Thread.currentThread().setName("Table " + (desc == null ? "NULL" : desc.getName()) + " main");
        Log.Info((Object)this, (String)("loading table " + desc.getName() + " " + desc.getStorageFile()));
        RecordStorage store = storeFactory.apply(desc);
        this.indexedStorage.wrapped(store);
        this.createIndizes(store);
        this.filterProcessor = new FilterProcessor(this);
        this.storageDriver = new StorageDriver(this.indexedStorage);
        this.storageDriver.setListener(this.filterProcessor);
        return RealLiveTableActor.resolve();
    }

    private void createIndizes(RecordStorage rawStorage) {
        if (this.description.getHashIndexed() != null && this.description.getHashIndexed().length > 0) {
            for (int i = 0; i < this.description.getHashIndexed().length; ++i) {
                String path = this.description.getHashIndexed()[i];
                VarPath vp = new VarPath(path, null, new QToken(path, "", 0));
                this.indexedStorage.addIndex(new HashIndex(rec -> {
                    Value evaluate = vp.evaluate((EvalContext)rec);
                    if (evaluate == null) {
                        return null;
                    }
                    return evaluate.getValue();
                }, path));
            }
        }
        ClusterTableRecordMapping tmpMapping = new ClusterTableRecordMapping();
        Promise done = new Promise();
        rawStorage.forEach(rec -> true, (Callback<Record>)(Callback & Serializable)(r, e) -> {
            if (r != null) {
                this.indexedStorage.initializeFromRecord((Record)r);
                tmpMapping.setBucket(tmpMapping.getBucket(r.getKey().hashCode()), true);
            } else {
                done.resolve();
            }
        });
        done.await(TimeUnit.MINUTES.toMillis(5L));
        ClusterTableRecordMapping savedMapping = rawStorage._loadMapping();
        if (savedMapping != null) {
            this.mapping = savedMapping;
            Log.Info((Object)this, (String)("loaded mapping " + savedMapping));
            Log.Info((Object)this, (String)("       induced " + tmpMapping));
        } else {
            Log.Info((Object)this, (String)("calculated mapping " + tmpMapping));
            this.mapping = tmpMapping;
        }
        Log.Info((Object)this, (String)("index creation done " + this.description.getName() + " " + this.description.getShardNo()));
    }

    public IPromise<ClusterTableRecordMapping> getRecordMapping() {
        return RealLiveTableActor.resolve((Object)this.mapping);
    }

    @Override
    public void receive(ChangeMessage change) {
        this.checkThread();
        try {
            this.storageDriver.receive(change);
        }
        catch (Exception th) {
            Log.Error((Object)this, (Throwable)th);
        }
    }

    public <T> void forEachDirect(Spore<Record, T> spore) {
        this.checkThread();
        try {
            this.storageDriver.getStore().forEachWithSpore(spore);
        }
        catch (Exception ex) {
            spore.complete(null, (Object)ex);
        }
    }

    @Override
    public <T> void forEachWithSpore(Spore<Record, T> spore) {
        if (spore instanceof FilterSpore && ((FilterSpore)spore).getFilter().getRecordLimit() > 0) {
            FilterSpore newSpore = new FilterSpore(((FilterSpore)spore).getFilter());
            ArrayList keys = new ArrayList();
            newSpore.onFinish(() -> this.delayedSend(keys, ((FilterSpore)spore).getFilter().getRecordLimit(), spore));
            newSpore.setForEach((Callback & Serializable)(r, e) -> {
                if (Actors.isResult((Object)e)) {
                    keys.add(r.getKey());
                }
            });
            this.forEachDirect(newSpore);
        } else {
            this.forEachQueued(spore, () -> {});
        }
    }

    private <T> void delayedSend(List<String> keys, int recordLimit, Spore<Record, T> spore) {
        int i = keys.size() - 1;
        for (int ii = 0; i >= 0 && ii < recordLimit; ++ii, --i) {
            Record record = this.storageDriver.getStore().get(keys.get(i));
            if (record != null) {
                spore.remote((Object)record);
            }
            keys.remove(i);
        }
        if (keys.size() > 0) {
            this.delayed(1000L, () -> this.delayedSend(keys, recordLimit, spore));
        } else {
            spore.finish();
        }
    }

    protected void hasStopped() {
    }

    @Override
    @CallerSideMethod
    public void subscribe(Subscriber subs) {
        Callback & Serializable callback = (Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                subs.getReceiver().receive((ChangeMessage)r);
            }
        };
        this._subscribe(subs.getFilter(), callback, subs.getId());
    }

    public void _subscribe(RLPredicate pred, Callback cb, int id) {
        this.checkThread();
        long now = System.currentTimeMillis();
        if (now - this.lastReportTime > REPORT_INTERVAL) {
            Log.Info((Object)this, (String)("mem report filterProc " + this.filterProcessor.getFilterSize() + ", receiverSideSubsMap:" + this.receiverSideSubsMap.size()));
            this.lastReportTime = now;
        }
        Subscriber localSubs = new Subscriber(pred, change -> cb.pipe((Object)change)).serverSideCB(cb);
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        this.receiverSideSubsMap.put(sid, localSubs);
        if (pred instanceof RLNoQueryPredicate) {
            localSubs.getReceiver().receive(RLUtil.get().done());
            this.filterProcessor.startListening(localSubs);
        } else {
            FilterSpore spore = new FilterSpore(localSubs.getFilter()).modifiesResult(false);
            spore.onFinish(() -> localSubs.getReceiver().receive(RLUtil.get().done()));
            spore.setForEach((Callback & Serializable)(r, e) -> {
                if (Actors.isResult((Object)e)) {
                    localSubs.getReceiver().receive(new AddMessage(0, (Record)r));
                } else {
                    localSubs.getReceiver().receive(RLUtil.get().done());
                }
            });
            this.forEachQueued(spore, () -> this.filterProcessor.startListening(localSubs));
        }
    }

    public IPromise _setMapping(ClusterTableRecordMapping mapping) {
        Log.Info((Object)this, (String)("received mapping " + mapping));
        this.mapping = mapping;
        this.storageDriver._saveMapping(mapping);
        return RealLiveTableActor.resolve((Object)true);
    }

    @CallerSideMethod
    public ClusterTableRecordMapping getMapping() {
        return ((RealLiveTableActor)this.getActor()).mapping;
    }

    private void forEachQueued(Spore s, Runnable r) {
        if (s instanceof FilterSpore && ((FilterSpore)s).getFilter() instanceof RLHashIndexPredicate) {
            this.processHashedFilter(s);
        } else if (s instanceof FilterSpore && ((FilterSpore)s).getFilter() instanceof QueryPredicate && ((QueryPredicate)((FilterSpore)s).getFilter()).getCompiled().getHashIndex() != null) {
            FilterSpore fisp = (FilterSpore)s;
            QueryPredicate p = (QueryPredicate)fisp.getFilter();
            String indexString = p.getCompiled().getHashIndex().getPath(0).getPathString();
            if (this.indexedStorage.getHashIndex(indexString) != null) {
                Log.Info((Object)this, (String)("detected index use in query " + p.getQuery()));
                fisp._setFilter(p.getCompiled().getHashIndex());
                this.processHashedFilter(fisp);
            } else {
                this.queuedSpores.add(new QueryQEntry(s, r));
                this.delayed(1L, () -> this._execQueriesOrDelay(this.queuedSpores.size(), this.taCount));
            }
        } else {
            this.queuedSpores.add(new QueryQEntry(s, r));
            this.delayed(1L, () -> this._execQueriesOrDelay(this.queuedSpores.size(), this.taCount));
        }
    }

    private void processHashedFilter(Spore s) {
        Stream<Object> keys;
        long tim = System.currentTimeMillis();
        RLHashIndexPredicate pathes = (RLHashIndexPredicate)((FilterSpore)s).getFilter();
        if (pathes.getPath().size() == 1) {
            RLHashIndexPredicate.RLPath path = pathes.getPath(0);
            HashIndex idx = this.indexedStorage.getHashIndex(path.getPathString());
            if (idx == null) {
                s.complete(null, (Object)("hashIndex " + path.getPathString() + " not found"));
                return;
            }
            keys = idx.getKeys(path.getKey());
        } else {
            HashSet<String> res = null;
            for (int i = 0; i < pathes.getPath().size(); ++i) {
                RLHashIndexPredicate.RLPath path = pathes.getPath(i);
                HashIndex idx = this.indexedStorage.getHashIndex(path.getPathString());
                if (idx == null) {
                    s.complete(null, (Object)("hashIndex " + path.getPathString() + " not found"));
                    return;
                }
                Set<String> keySet = idx.getKeySet(path.getKey());
                if (res == null) {
                    res = new HashSet<String>(Math.max(keySet.size(), 800));
                }
                if (path instanceof RLHashIndexPredicate.JoinPath) {
                    res.addAll(keySet);
                    continue;
                }
                if (path instanceof RLHashIndexPredicate.SubtractPath) {
                    res.removeAll(keySet);
                    continue;
                }
                if (!(path instanceof RLHashIndexPredicate.IntersectionPath)) continue;
                res.retainAll(keySet);
            }
            keys = res.stream();
        }
        keys.forEach(key -> {
            Record record = this.storageDriver.getStore().get((String)key);
            if (record != null) {
                try {
                    s.remote((Object)record);
                }
                catch (Exception e) {
                    s.complete(null, (Object)e);
                }
            } else {
                Log.Error((Object)this, (String)("corrupted index cannot find " + key));
            }
        });
        s.finish();
        if (DUMP_QUERY_TIME) {
            Log.Info((Object)this, (String)("hashed query on " + this.description.getName() + "::" + pathes + " " + (System.currentTimeMillis() - tim)));
        }
    }

    public void _execQueriesOrDelay(int size, int taCount) {
        long tim = System.currentTimeMillis();
        Consumer<Record> recordConsumer = rec -> {
            for (int i = 0; i < this.queuedSpores.size(); ++i) {
                QueryQEntry qqentry = this.queuedSpores.get(i);
                Spore spore = qqentry.spore;
                if (spore.isFinished()) continue;
                try {
                    spore.remote(rec);
                    continue;
                }
                catch (Throwable ex) {
                    Log.Warn((Object)this, (Throwable)ex, (String)("exception in spore " + spore));
                    spore.complete(null, (Object)ex);
                }
            }
        };
        this.storageDriver.getStore().stream().forEach(recordConsumer);
        this.queuedSpores.forEach(qqentry -> {
            qqentry.spore.finish();
            qqentry.onFin.run();
        });
        if (DUMP_QUERY_TIME && this.queuedSpores.size() > 0) {
            System.out.println("tim for " + this.queuedSpores.size() + " " + (System.currentTimeMillis() - tim) + " per q:" + (System.currentTimeMillis() - tim) / (long)this.queuedSpores.size());
        }
        this.queuedSpores.clear();
    }

    protected String addChannelIdIfPresent(Callback cb, String sid) {
        if (cb instanceof CallbackWrapper && ((CallbackWrapper)cb).isRemote()) {
            CallbackRefSerializer.MyRemotedCallback realCallback = (CallbackRefSerializer.MyRemotedCallback)((CallbackWrapper)cb).getRealCallback();
            sid = (String)sid + "#" + realCallback.getChanId();
        }
        return sid;
    }

    @Override
    @CallerSideMethod
    public void unsubscribe(Subscriber subs) {
        this._unsubscribe((Callback & Serializable)(r, e) -> {}, subs.getId());
    }

    @Override
    public void unsubscribeById(int subsId) {
        this._unsubscribe((Callback & Serializable)(r, e) -> {}, subsId);
    }

    public void _unsubscribe(Callback cb, int id) {
        this.checkThread();
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        Subscriber subs = this.receiverSideSubsMap.get(sid);
        this.filterProcessor.unsubscribe(subs);
        this.receiverSideSubsMap.remove(sid);
        cb.finish();
        subs.getServerSideCB().finish();
    }

    @Override
    public IPromise<Record> get(String key) {
        ++this.taCount;
        return RealLiveTableActor.resolve((Object)this.storageDriver.getStore().get(key));
    }

    @Override
    public IPromise<Long> size() {
        return RealLiveTableActor.resolve((Object)this.storageDriver.getStore().size());
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return RealLiveTableActor.resolve((Object)this.description);
    }

    @Override
    public IPromise<StorageStats> getStats() {
        try {
            StorageStats stats = this.storageDriver.getStore().getStats();
            return RealLiveTableActor.resolve((Object)stats);
        }
        catch (Throwable th) {
            Log.Warn((Object)this, (Throwable)th);
            return RealLiveTableActor.reject((Object)th.getMessage());
        }
    }

    @Override
    public IPromise atomic(int senderId, String key, RLFunction<Record, Object> action) {
        ++this.taCount;
        return this.storageDriver.atomicQuery(senderId, key, action);
    }

    @Override
    public void atomicUpdate(RLPredicate<Record> filter, RLFunction<Record, Boolean> action) {
        ++this.taCount;
        this.storageDriver.atomicUpdate(filter, action);
    }

    @Override
    public IPromise resizeIfLoadFactorLarger(double loadFactor, long maxGrowBytes) {
        Log.Info((Object)this, (String)("resizing table if lf: " + loadFactor + " maxgrow:" + maxGrowBytes));
        long now = System.currentTimeMillis();
        this.storageDriver.resizeIfLoadFactorLarger(loadFactor, maxGrowBytes);
        Log.Info((Object)this, (String)("resizing duration" + (System.currentTimeMillis() - now)));
        return RealLiveTableActor.resolve();
    }

    @Override
    public void put(int senderId, String key, Object ... keyVals) {
        this.receive(RLUtil.get().put(senderId, key, keyVals));
    }

    @Override
    public void merge(int senderId, String key, Object ... keyVals) {
        if (key instanceof Record) {
            throw new RuntimeException("probably accidental method resolution fail. Use merge instead");
        }
        this.receive(RLUtil.get().addOrUpdate(senderId, key, keyVals));
    }

    @Override
    public void _deepMerge(int senderId, Record jsonrec) {
        this.atomic(senderId, jsonrec.getKey(), rec -> {
            if (rec == null) {
                jsonrec.stripOps();
                return new AddMessage(senderId, jsonrec);
            }
            rec.deepMerge(jsonrec);
            return null;
        });
    }

    @Override
    public IPromise<Boolean> add(int senderId, String key, Object ... keyVals) {
        if (this.storageDriver.getStore().get(key) != null) {
            return RealLiveTableActor.resolve((Object)false);
        }
        this.receive(RLUtil.get().add(senderId, key, keyVals));
        return RealLiveTableActor.resolve((Object)true);
    }

    @Override
    public IPromise<Boolean> addRecord(int senderId, Record rec) {
        Record existing;
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        if ((existing = this.storageDriver.getStore().get(rec.getKey())) != null) {
            return RealLiveTableActor.resolve((Object)false);
        }
        this.receive(new AddMessage(senderId, rec));
        return RealLiveTableActor.resolve((Object)true);
    }

    @Override
    public void mergeRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.receive(new AddMessage(senderId, true, rec));
    }

    @Override
    public void setRecord(int senderId, Record rec) {
        if (rec instanceof RecordWrapper) {
            rec = ((RecordWrapper)rec).getRecord();
        }
        this.receive(new PutMessage(senderId, rec));
    }

    @Override
    public void update(int senderId, String key, Object ... keyVals) {
        this.receive(RLUtil.get().update(senderId, key, keyVals));
    }

    @Override
    public IPromise<Record> take(int senderId, String key) {
        Record record = this.storageDriver.getStore().get(key);
        this.receive(RLUtil.get().remove(senderId, key));
        return RealLiveTableActor.resolve((Object)record);
    }

    @Override
    public void remove(int senderId, String key) {
        RemoveMessage remove = RLUtil.get().remove(senderId, key);
        this.receive(remove);
    }

    public void _removeSilent(String key) {
        this.storageDriver.getStore().remove(key);
    }

    public void _addSilent(Record rec) {
        this.storageDriver.getStore()._put(rec.getKey(), rec);
    }

    public IPromise<TableState> getTableState() {
        return RealLiveTableActor.resolve((Object)new TableState(this.mapping, this.storageDriver.getStore().size(), this.description.getName()));
    }

    static class QueryQEntry {
        Spore spore;
        Runnable onFin;

        public QueryQEntry(Spore spore, Runnable onFin) {
            this.spore = spore;
            this.onFin = onFin;
        }
    }
}

