/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.base;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.RemoteConnection;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.Timeout;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.impl.InternalActorStoppedException;
import org.nustaq.kontraktor.impl.RemoteScheduler;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.remoting.encoding.ActorRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.remoting.encoding.RemoteCallEntry;
import org.nustaq.kontraktor.remoting.encoding.SerializerType;
import org.nustaq.kontraktor.remoting.encoding.SporeRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.TimeoutSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectSerializer;
import org.nustaq.serialization.util.FSTUtil;

public abstract class RemoteRegistry
implements RemoteConnection {
    public static final Object OUT_OF_ORDER_SEQ = "OOOS";
    public static int MAX_BATCH_CALLS = 500;
    private ActorServer server;
    protected FSTConfiguration conf;
    protected RemoteScheduler scheduler = new RemoteScheduler();
    protected AtomicLong actorIdCount = new AtomicLong(0L);
    protected ConcurrentHashMap<Long, Object> publishedActorMapping = new ConcurrentHashMap();
    protected ConcurrentHashMap<Object, Long> publishedActorMappingReverse = new ConcurrentHashMap();
    protected ConcurrentLinkedQueue<Actor> remoteActors = new ConcurrentLinkedQueue();
    protected ConcurrentHashMap<Long, Actor> remoteActorSet = new ConcurrentHashMap();
    protected volatile boolean terminated = false;
    protected BiFunction<Actor, String, Boolean> remoteCallInterceptor = (actor, methodName) -> {
        Method method = actor.__getCachedMethod((String)methodName, (Actor)actor);
        if (method == null) {
            Log.Warn(null, "no such method on " + actor.getClass().getSimpleName() + "#" + methodName);
        }
        if (method == null || method.getAnnotation(Local.class) != null) {
            return false;
        }
        return true;
    };
    protected Consumer<Actor> disconnectHandler;
    protected boolean isObsolete;
    private Actor facadeActor;

    public static void registerDefaultClassMappings(FSTConfiguration conf) {
        conf.registerCrossPlatformClassMapping((String[][])new String[][]{{"call", RemoteCallEntry.class.getName()}, {"cbw", CallbackWrapper.class.getName()}});
    }

    public RemoteRegistry(FSTConfiguration conf, Coding coding) {
        this.conf = conf;
        this.configureSerialization(coding);
    }

    public RemoteRegistry(Coding code) {
        if (code == null) {
            code = new Coding(SerializerType.FSTSer);
        }
        this.conf = code.createConf();
        RemoteRegistry.registerDefaultClassMappings(this.conf);
        this.configureSerialization(code);
    }

    public BiFunction<Actor, String, Boolean> getRemoteCallInterceptor() {
        return this.remoteCallInterceptor;
    }

    public void setRemoteCallInterceptor(BiFunction<Actor, String, Boolean> remoteCallInterceptor) {
        this.remoteCallInterceptor = remoteCallInterceptor;
    }

    protected void configureSerialization(Coding code) {
        this.conf.registerSerializer(Actor.class, (FSTObjectSerializer)new ActorRefSerializer(this), true);
        this.conf.registerSerializer(CallbackWrapper.class, (FSTObjectSerializer)new CallbackRefSerializer(this), true);
        this.conf.registerSerializer(Spore.class, (FSTObjectSerializer)new SporeRefSerializer(), true);
        this.conf.registerClass(new Class[]{RemoteCallEntry.class});
        this.conf.registerSerializer(Timeout.class, (FSTObjectSerializer)new TimeoutSerializer(), false);
    }

    public Actor getPublishedActor(long id) {
        return (Actor)this.publishedActorMapping.get(id);
    }

    public Callback getPublishedCallback(long id) {
        return (Callback)this.publishedActorMapping.get(id);
    }

    public RemoteScheduler getScheduler() {
        return this.scheduler;
    }

    public ConcurrentLinkedQueue<Actor> getRemoteActors() {
        return this.remoteActors;
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public void setTerminated(boolean terminated) {
        this.terminated = terminated;
    }

    public long publishActor(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer == null) {
            integer = this.newActId();
            this.publishActorDirect(integer, act);
        }
        return integer;
    }

    private long newActId() {
        long id = this.actorIdCount.incrementAndGet();
        return id;
    }

    private void publishActorDirect(Long integer, Actor act) {
        this.publishedActorMapping.put(integer, act.getActorRef());
        this.publishedActorMappingReverse.put(act.getActorRef(), integer);
        act.__addRemoteConnection(this);
    }

    @Override
    public void unpublishActor(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer != null) {
            Log.Debug((Object)this, "" + act.getClass().getSimpleName() + " unpublished");
            this.publishedActorMapping.remove(integer);
            this.publishedActorMappingReverse.remove(act.getActorRef());
            act.__removeRemoteConnection(this);
            if (act instanceof RemotedActor) {
                ((RemotedActor)((Object)act)).hasBeenUnpublished();
            }
        }
    }

    public long registerPublishedCallback(Callback cb) {
        Long integer = this.publishedActorMappingReverse.get(cb);
        if (integer == null) {
            integer = this.newActId();
            this.publishedActorMapping.put(integer, cb);
            this.publishedActorMappingReverse.put(cb, integer);
        }
        return integer;
    }

    public void removePublishedObject(long receiverKey) {
        Object remove = this.publishedActorMapping.remove(receiverKey);
        if (remove != null) {
            this.publishedActorMappingReverse.remove(remove);
        }
    }

    public void registerRemoteRefDirect(Actor act) {
        act = act.getActorRef();
        this.remoteActorSet.put(act.__remoteId, act);
        this.remoteActors.add(act);
        act.__clientConnection = this;
        act.__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
    }

    public Actor registerRemoteActorRef(Class actorClazz, long remoteId, Object client) {
        Actor actorRef = this.remoteActorSet.get(remoteId);
        if (actorRef == null) {
            Object res = Actors.AsActor(actorClazz, this.getScheduler());
            ((Actor)res).__remoteId = remoteId;
            this.remoteActorSet.put(remoteId, (Actor)res);
            this.remoteActors.add((Actor)res);
            ((Actor)res).__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
            ((Actor)res).__clientConnection = this;
            return res;
        }
        return actorRef;
    }

    protected void remoteRefStopped(Actor actor) {
        this.removeRemoteActor(actor);
        actor.getActorRef().__stopped = true;
        ((Actor)actor.getActor()).__stopped = true;
    }

    public void stopRemoteRefs() {
        new ArrayList<Actor>(this.remoteActors).forEach(actor -> {
            if (this.disconnectHandler != null) {
                this.disconnectHandler.accept((Actor)actor);
            }
            try {
                this.removeRemoteActor((Actor)actor);
            }
            catch (Exception e) {
                Log.Warn((Object)this, e);
            }
            actor.getActorRef().__stopped = true;
            if (actor.getActor() != null) {
                ((Actor)actor.getActor()).__stopped = true;
            }
        });
    }

    protected void removeRemoteActor(Actor act) {
        this.remoteActorSet.remove(act.__remoteId);
        this.remoteActors.remove(act);
        try {
            act.__stop();
        }
        catch (InternalActorStoppedException internalActorStoppedException) {
            // empty catch block
        }
    }

    public boolean receiveObject(ObjectSocket responseChannel, ObjectSink receiver, Object response, List<IPromise> createdFutures) throws Exception {
        if (response == OUT_OF_ORDER_SEQ) {
            return false;
        }
        if (response instanceof Object[]) {
            Object[] arr = (Object[])response;
            boolean hadResp = false;
            int max = arr.length - 1;
            int inSequence = 0;
            if (!(arr[max] instanceof Number)) {
                ++max;
            } else {
                inSequence = ((Number)arr[max]).intValue();
            }
            for (int i = 0; i < max; ++i) {
                Object resp = arr[i];
                if (!(resp instanceof RemoteCallEntry)) {
                    if (resp != null && !"SP".equals(resp)) {
                        Log.Lg.error(this, null, "unexpected response:" + resp);
                    }
                    hadResp = true;
                    continue;
                }
                if (!this.processRemoteCallEntry(responseChannel, (RemoteCallEntry)resp, createdFutures)) continue;
                hadResp = true;
            }
            return hadResp;
        }
        if (!(response instanceof RemoteCallEntry)) {
            if (response != null && !"SP".equals(response)) {
                Log.Lg.error(this, null, "unexpected response:" + response);
            }
            return true;
        }
        return this.processRemoteCallEntry(responseChannel, (RemoteCallEntry)response, createdFutures);
    }

    protected boolean processRemoteCallEntry(ObjectSocket objSocket, RemoteCallEntry response, List<IPromise> createdFutures) throws Exception {
        block17: {
            boolean isContinue;
            RemoteCallEntry read = response;
            boolean bl = isContinue = read.getArgs().length > 1 && "CNT".equals(read.getArgs()[1]);
            if (isContinue) {
                read.getArgs()[1] = "CNT";
            }
            if (read.getQueue() == 0) {
                Actor targetActor = this.getPublishedActor(read.getReceiverKey());
                if (targetActor == null) {
                    Log.Lg.error(this, null, "registry:" + System.identityHashCode(this) + " no actor found for key " + read);
                    return true;
                }
                if (targetActor.isStopped() || targetActor.getScheduler() == null) {
                    Log.Lg.error(this, null, "actor found for key " + read + " is stopped and/or has no scheduler set");
                    this.receiveCBResult(objSocket, read.getFutureKey(), null, InternalActorStoppedException.Instance);
                    return true;
                }
                if (this.remoteCallInterceptor != null && !this.remoteCallInterceptor.apply(targetActor, read.getMethod()).booleanValue()) {
                    Log.Warn((Object)this, "remote message blocked by securityinterceptor " + targetActor.getClass().getName() + " " + read.getMethod());
                    return false;
                }
                try {
                    Object future = targetActor.getScheduler().enqueueCallFromRemote(this, null, targetActor, read.getMethod(), read.getArgs(), false);
                    if (future instanceof IPromise) {
                        Promise p = null;
                        if (createdFutures != null) {
                            p = new Promise();
                            createdFutures.add(p);
                        }
                        Promise finalP = p;
                        Thread debug = Thread.currentThread();
                        ((IPromise)future).then((r, e) -> {
                            try {
                                Thread debug1 = Thread.currentThread();
                                Thread debug2 = debug;
                                this.receiveCBResult(objSocket, read.getFutureKey(), r, e);
                                if (finalP != null) {
                                    finalP.complete();
                                }
                            }
                            catch (Exception ex) {
                                Log.Warn(this, ex, "");
                            }
                        });
                    }
                }
                catch (Throwable th) {
                    Log.Warn((Object)this, th);
                    if (read.getFutureKey() > 0L) {
                        this.receiveCBResult(objSocket, read.getFutureKey(), null, FSTUtil.toString((Throwable)th));
                        break block17;
                    }
                    FSTUtil.rethrow((Throwable)th);
                }
            } else if (read.getQueue() == 1) {
                Callback publishedCallback = this.getPublishedCallback(read.getReceiverKey());
                if (publishedCallback == null) {
                    if (read.getArgs() != null && read.getArgs().length == 2 && read.getArgs()[1] instanceof InternalActorStoppedException) {
                        Log.Warn((Object)this, "call to stopped remote actor");
                    } else {
                        Log.Warn((Object)this, "Publisher already deregistered, set error to 'Actor.CONT' in order to signal more messages will be sent. " + read);
                    }
                } else {
                    publishedCallback.complete(read.getArgs()[0], read.getArgs()[1]);
                    if (!isContinue) {
                        this.removePublishedObject(read.getReceiverKey());
                    }
                }
            }
        }
        return createdFutures != null && createdFutures.size() > 0;
    }

    public void cleanUp() {
        this.conf.clearCaches();
        this.stopRemoteRefs();
        ((ConcurrentHashMap.KeySetView)this.publishedActorMappingReverse.keySet()).forEach(act -> {
            if (act instanceof Actor) {
                this.unpublishActor((Actor)act);
            }
        });
        this.getFacadeProxy().__removeRemoteConnection(this);
    }

    protected void closeRef(CallEntry ce, ObjectSocket chan) throws IOException {
        if (ce.getTargetActor().getActorRef() == this.getFacadeProxy().getActorRef()) {
            chan.close();
        } else {
            this.removeRemoteActor(ce.getTargetActor());
        }
    }

    protected void writeObject(ObjectSocket chan, RemoteCallEntry rce) throws Exception {
        try {
            chan.writeObject(rce);
        }
        catch (Exception e) {
            Log.Debug((Object)this, "a connection closed '" + e.getMessage() + "', terminating registry");
            this.setTerminated(true);
            this.cleanUp();
        }
    }

    public void receiveCBResult(ObjectSocket chan, long id, Object result, Object error) throws Exception {
        if (this.facadeActor != null) {
            DispatcherThread debug = this.facadeActor.getCurrentDispatcher();
            if (Thread.currentThread() != this.facadeActor.getCurrentDispatcher()) {
                this.facadeActor.execute(() -> {
                    try {
                        if (Thread.currentThread() != debug) {
                            System.out.println("??");
                        }
                        this.receiveCBResult(chan, id, result, error);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                return;
            }
        }
        RemoteCallEntry rce = new RemoteCallEntry(0L, id, null, new Object[]{result, error});
        rce.setQueue(1);
        this.writeObject(chan, rce);
    }

    @Override
    public void close() {
        try {
            this.getWriteObjectSocket().get().flush();
        }
        catch (Exception e) {
            Log.Warn((Object)this, e);
        }
        this.cleanUp();
    }

    public FSTConfiguration getConf() {
        return this.conf;
    }

    public abstract Actor getFacadeProxy();

    public void setDisconnectHandler(Consumer<Actor> disconnectHandler) {
        this.disconnectHandler = disconnectHandler;
    }

    public Consumer<Actor> getDisconnectHandler() {
        return this.disconnectHandler;
    }

    @Override
    public void setClassLoader(ClassLoader l) {
        this.conf.setClassLoader(l);
    }

    @Override
    public long getRemoteId(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        return integer == null ? -1L : integer;
    }

    public boolean pollAndSend2Remote(AtomicReference<ObjectSocket> chanHolder) throws Exception {
        int sumQueued;
        ObjectSocket chan = chanHolder.get();
        if (chan == null || !chan.canWrite()) {
            return false;
        }
        boolean hadAnyMsg = false;
        ArrayList<Actor> toRemove = null;
        int fullqueued = 0;
        do {
            sumQueued = 0;
            for (Actor remoteActor : this.remoteActors) {
                boolean cb = false;
                CallEntry ce = (CallEntry)remoteActor.__cbQueue.poll();
                if (ce == null) {
                    cb = false;
                    ce = (CallEntry)remoteActor.__mailbox.poll();
                }
                if (ce == null) continue;
                if (ce.getMethod().getName().equals("close")) {
                    this.closeRef(ce, chan);
                    continue;
                }
                if (ce.getMethod().getName().equals("asyncstop")) {
                    Log.Lg.error(this, null, "cannot stop remote actors");
                    continue;
                }
                long futId = 0L;
                if (ce.hasFutureResult()) {
                    futId = this.registerPublishedCallback(ce.getFutureCB());
                }
                try {
                    RemoteCallEntry rce = new RemoteCallEntry(futId, remoteActor.__remoteId, ce.getMethod().getName(), ce.getArgs());
                    rce.setQueue(cb ? 1 : 0);
                    this.writeObject(chan, rce);
                    ++sumQueued;
                    hadAnyMsg = true;
                }
                catch (Exception ex) {
                    chan.setLastError(ex);
                    if (toRemove == null) {
                        toRemove = new ArrayList<Actor>();
                    }
                    toRemove.add(remoteActor);
                    remoteActor.__stop();
                    Log.Lg.infoLong(this, ex, "connection closed");
                    break;
                }
            }
            if (toRemove == null) continue;
            toRemove.forEach(act -> this.removeRemoteActor((Actor)act));
        } while (sumQueued > 0 && (fullqueued += sumQueued) < MAX_BATCH_CALLS);
        chan.flush();
        return hadAnyMsg;
    }

    public abstract AtomicReference<ObjectSocket> getWriteObjectSocket();

    public boolean isObsolete() {
        return this.isObsolete;
    }

    public void setIsObsolete(boolean isObsolete) {
        this.isObsolete = isObsolete;
    }

    public int getRemoteActorSize() {
        return this.remoteActorSet.size();
    }

    public void setFacadeActor(Actor facadeActor) {
        this.facadeActor = facadeActor;
    }

    public Actor getFacadeActor() {
        return this.facadeActor;
    }

    public void setServer(ActorServer server) {
        this.server = server;
    }

    public ActorServer getServer() {
        return this.server;
    }

    @Override
    public IPromise closeNetwork() {
        if (this.server != null) {
            return this.server.close();
        }
        Log.Warn(null, "failed closing underlying network connection as server is null");
        return new Promise<Object>(null, "server is null");
    }
}

