/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.reallive.impl.actors;

import java.io.Serializable;
import java.util.HashMap;
import java.util.function.Supplier;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.reallive.impl.FilterProcessorImpl;
import org.nustaq.reallive.impl.Mutator;
import org.nustaq.reallive.impl.StorageDriver;
import org.nustaq.reallive.impl.storage.StorageStats;
import org.nustaq.reallive.interfaces.ChangeMessage;
import org.nustaq.reallive.interfaces.ChangeReceiver;
import org.nustaq.reallive.interfaces.FilterProcessor;
import org.nustaq.reallive.interfaces.Mutatable;
import org.nustaq.reallive.interfaces.Mutation;
import org.nustaq.reallive.interfaces.RLPredicate;
import org.nustaq.reallive.interfaces.RealLiveTable;
import org.nustaq.reallive.interfaces.Record;
import org.nustaq.reallive.interfaces.RecordStorage;
import org.nustaq.reallive.interfaces.Subscriber;
import org.nustaq.reallive.interfaces.TableDescription;

public class RealLiveStreamActor<K>
extends Actor<RealLiveStreamActor<K>>
implements RealLiveTable<K>,
Mutatable<K> {
    StorageDriver<K> storageDriver;
    FilterProcessor<K> filterProcessor;
    HashMap<String, Subscriber> receiverSideSubsMap = new HashMap();
    TableDescription description;

    @Local
    public void init(Supplier<RecordStorage<K>> storeFactory, TableDescription desc) {
        this.description = desc;
        RecordStorage<K> store = storeFactory.get();
        this.storageDriver = new StorageDriver<K>(store);
        this.filterProcessor = new FilterProcessorImpl(this);
        this.storageDriver.setListener(this.filterProcessor);
    }

    @Override
    public void receive(ChangeMessage<K> change) {
        this.checkThread();
        try {
            this.storageDriver.receive(change);
        }
        catch (Exception th) {
            Log.Error((Object)this, (Throwable)th);
        }
    }

    @Override
    public <T> void forEach(Spore<Record<K>, T> spore) {
        this.checkThread();
        try {
            this.storageDriver.getStore().forEach(spore);
        }
        catch (Exception ex) {
            spore.complete(null, (Object)ex);
        }
    }

    protected void hasStopped() {
    }

    @Override
    @CallerSideMethod
    public void subscribe(Subscriber<K> subs) {
        Callback & Serializable callback = (Callback & Serializable)(r, e) -> {
            if (Actors.isResult((Object)e)) {
                subs.getReceiver().receive((ChangeMessage)r);
            }
        };
        this._subscribe(subs.getFilter(), callback, subs.getId());
    }

    public void _subscribe(RLPredicate pred, Callback cb, int id) {
        this.checkThread();
        Subscriber localSubs = new Subscriber(pred, change -> cb.stream((Object)change));
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        this.receiverSideSubsMap.put(sid, localSubs);
        this.filterProcessor.subscribe(localSubs);
    }

    protected String addChannelIdIfPresent(Callback cb, String sid) {
        if (cb instanceof CallbackWrapper && ((CallbackWrapper)cb).isRemote()) {
            CallbackRefSerializer.MyRemotedCallback realCallback = (CallbackRefSerializer.MyRemotedCallback)((CallbackWrapper)cb).getRealCallback();
            sid = sid + "#" + realCallback.getChanId();
        }
        return sid;
    }

    @Override
    @CallerSideMethod
    public void unsubscribe(Subscriber<K> subs) {
        this._unsubscribe((Callback & Serializable)(r, e) -> {}, subs.getId());
    }

    public void _unsubscribe(Callback cb, int id) {
        this.checkThread();
        String sid = this.addChannelIdIfPresent(cb, "" + id);
        this.filterProcessor.unsubscribe(this.receiverSideSubsMap.get(sid));
        this.receiverSideSubsMap.remove(sid);
    }

    @Override
    public IPromise<Record<K>> get(K key) {
        return RealLiveStreamActor.resolve(this.storageDriver.getStore().get(key));
    }

    @Override
    public IPromise<Long> size() {
        return RealLiveStreamActor.resolve((Object)this.storageDriver.getStore().size());
    }

    @Override
    @CallerSideMethod
    public Mutation<K> getMutation() {
        return new Mutator((ChangeReceiver)this.self());
    }

    @Override
    public IPromise<TableDescription> getDescription() {
        return RealLiveStreamActor.resolve((Object)this.description);
    }

    @Override
    public IPromise<StorageStats> getStats() {
        return RealLiveStreamActor.resolve((Object)this.storageDriver.getStore().getStats());
    }

    @Override
    public IPromise<Boolean> putCAS(RLPredicate<Record<K>> casCondition, K key, Object[] keyVals) {
        return this.storageDriver.putCAS(casCondition, key, keyVals);
    }
}

