/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.impl.FilterProcessor;
import org.nustaq.reallive.impl.FilterSpore;
import org.nustaq.reallive.impl.Mutator;
import org.nustaq.reallive.impl.RLUtil;
import org.nustaq.reallive.impl.StorageDriver;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.ChangeReceiver;
import org.nustaq.reallive.interfaces.Mutatable;
import org.nustaq.reallive.interfaces.Mutation;
import org.nustaq.reallive.interfaces.RLConsumer;
import org.nustaq.reallive.interfaces.RLPredicate;
import org.nustaq.reallive.interfaces.RealLiveTable;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.RecordStorage;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.interfaces.TableDescription;
import org.nustaq.reallive.messages.AddMessage;

public class RealLiveTableActor<K>
extends Actor<RealLiveTableActor<K>>
implements RealLiveTable<K>,
Mutatable<K> {
    public static int MAX_QUERY_BATCH_SIZE = 10;
    public static boolean DUMP_QUERY_TIME = false;
    StorageDriver<K> storageDriver;
    FilterProcessor<K> filterProcessor;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap();
    TableDescription description;
    ArrayList<QueryQEntry> queuedSpores = new ArrayList();
    int taCount = 0;

    @Local
    public void init(Supplier<RecordStorage<K>> storeFactory, TableDescription desc) {
        this.description = desc;
        Thread.currentThread().setName("Table " + desc.getName() + " main");
        RecordStorage<K> store = storeFactory.get();
        this.storageDriver = new StorageDriver<K>(store);
        this.filterProcessor = new FilterProcessor(this);
        this.storageDriver.setListener(this.filterProcessor);
    }

    @Override
    public void receive(ChangeMessage<K> change) {
        this.checkThread();
        try {
            this.storageDriver.receive(change);
        }
        catch (Exception th) {
            Log.Error((Object)this, (Throwable)th);
        }
    }

    public <T> void forEachDirect(Spore<Record<K>, T> spore) {
        this.checkThread();
        try {
            this.storageDriver.getStore().forEach(spore);
        }
        catch (Exception ex) {
            spore.complete(null, (Object)ex);
        }
    }

    @Override
    public <T> void forEach(Spore<Record<K>, T> spore) {
        this.forEachQueued(spore, () -> {});
    }

    protected void hasStopped() {
    }

    @Override
    @CallerSideMethod
    public void subscribe(Subscriber<K> subs) {
        Callback & Serializable callback = (Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                subs.getReceiver().receive((ChangeMessage)r);
            }
        };
        this._subscribe(subs.getPrePatchFilter(), subs.getFilter(), callback, subs.getId());
    }

    public void _subscribe(RLPredicate<Record<K>> prePatchFilter, RLPredicate pred, Callback cb, int id) {
        this.checkThread();
        Subscriber localSubs = new Subscriber<K>(prePatchFilter, pred, change -> cb.stream((Object)change)).serverSideCB(cb);
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        this.receiverSideSubsMap.put(sid, localSubs);
        FilterSpore spore = new FilterSpore(localSubs.getFilter(), localSubs.getPrePatchFilter());
        spore.onFinish(() -> localSubs.getReceiver().receive(RLUtil.get().done()));
        spore.setForEach((Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                localSubs.getReceiver().receive(new AddMessage((Record)r));
            } else {
                localSubs.getReceiver().receive(RLUtil.get().done());
            }
        });
        this.forEachQueued(spore, () -> this.filterProcessor.startListening(localSubs));
    }

    void forEachQueued(Spore s, Runnable r) {
        this.queuedSpores.add(new QueryQEntry(s, r));
        ((RealLiveTableActor)this.self()).execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
    }

    public void execQueriesOrDelay(int size, int taCount) {
        if (this.queuedSpores.size() == size && this.taCount == taCount || this.queuedSpores.size() > MAX_QUERY_BATCH_SIZE) {
            long tim = System.currentTimeMillis();
            Consumer<Record> recordConsumer = rec -> {
                for (int i = 0; i < this.queuedSpores.size(); ++i) {
                    QueryQEntry qqentry = this.queuedSpores.get(i);
                    Spore spore = qqentry.spore;
                    if (spore.isFinished()) continue;
                    try {
                        spore.remote(rec);
                        continue;
                    }
                    catch (Throwable ex) {
                        spore.complete(null, (Object)ex);
                    }
                }
            };
            if (this.description.isParallelFiltering()) {
                ((Stream)this.storageDriver.getStore().stream().parallel()).forEach(recordConsumer);
            } else {
                this.storageDriver.getStore().stream().forEach(recordConsumer);
            }
            this.queuedSpores.forEach((? super E qqentry) -> {
                qqentry.spore.finish();
                qqentry.onFin.run();
            });
            if (DUMP_QUERY_TIME) {
                System.out.println("tim for " + this.queuedSpores.size() + " " + (System.currentTimeMillis() - tim));
            }
            this.queuedSpores.clear();
            return;
        }
        this.execQueriesOrDelay(this.queuedSpores.size(), this.taCount);
    }

    protected String addChannelIdIfPresent(Callback cb, String sid) {
        if (cb instanceof CallbackWrapper && ((CallbackWrapper)cb).isRemote()) {
            CallbackRefSerializer.MyRemotedCallback realCallback = (CallbackRefSerializer.MyRemotedCallback)((CallbackWrapper)cb).getRealCallback();
            sid = sid + "#" + realCallback.getChanId();
        }
        return sid;
    }

    @Override
    @CallerSideMethod
    public void unsubscribe(Subscriber<K> subs) {
        this._unsubscribe((Callback & Serializable)(r, e) -> {}, subs.getId());
    }

    public void _unsubscribe(Callback cb, int id) {
        this.checkThread();
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        Subscriber subs = this.receiverSideSubsMap.get(sid);
        this.filterProcessor.unsubscribe(subs);
        this.receiverSideSubsMap.remove(sid);
        cb.finish();
        subs.getServerSideCB().finish();
    }

    @Override
    public IPromise<Record<K>> get(K key) {
        ++this.taCount;
        return RealLiveTableActor.resolve(this.storageDriver.getStore().get(key));
    }

    @Override
    public IPromise<Long> size() {
        return RealLiveTableActor.resolve((Object)this.storageDriver.getStore().size());
    }

    @Override
    @CallerSideMethod
    public Mutation<K> getMutation() {
        return new Mutator((ChangeReceiver)this.self());
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return RealLiveTableActor.resolve((Object)this.description);
    }

    @Override
    public IPromise<StorageStats> getStats() {
        try {
            StorageStats stats = this.storageDriver.getStore().getStats();
            return RealLiveTableActor.resolve((Object)stats);
        }
        catch (Throwable th) {
            Log.Warn((Object)this, (Throwable)th);
            return RealLiveTableActor.reject((Object)th.getMessage());
        }
    }

    @Override
    public IPromise<Boolean> putCAS(RLPredicate<Record<K>> casCondition, K key, Object[] keyVals) {
        ++this.taCount;
        return this.storageDriver.putCAS(casCondition, key, keyVals);
    }

    @Override
    public void atomic(K key, RLConsumer<Record<K>> action) {
        ++this.taCount;
        this.storageDriver.atomic(key, action);
    }

    static class QueryQEntry {
        Spore spore;
        Runnable onFin;

        public QueryQEntry(Spore spore, Runnable onFin) {
            this.spore = spore;
            this.onFin = onFin;
        }
    }
}

