/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.base;

import java.io.IOError;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.RemoteConnection;
import org.nustaq.kontraktor.Spore;
import org.nustaq.kontraktor.Timeout;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.annotations.Remoted;
import org.nustaq.kontraktor.annotations.Secured;
import org.nustaq.kontraktor.impl.ActorProxyFactory;
import org.nustaq.kontraktor.impl.CallEntry;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.impl.DispatcherThread;
import org.nustaq.kontraktor.impl.InternalActorStoppedException;
import org.nustaq.kontraktor.impl.RemoteScheduler;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.remoting.base.SessionResurrector;
import org.nustaq.kontraktor.remoting.base.UnknownActorException;
import org.nustaq.kontraktor.remoting.encoding.ActorRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.CallbackRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.remoting.encoding.RemoteCallEntry;
import org.nustaq.kontraktor.remoting.encoding.SerializerType;
import org.nustaq.kontraktor.remoting.encoding.SporeRefSerializer;
import org.nustaq.kontraktor.remoting.encoding.TimeoutSerializer;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectSerializer;

public abstract class RemoteRegistry
implements RemoteConnection {
    public static final Object OUT_OF_ORDER_SEQ = "OOOS";
    public static int MAX_BATCH_CALLS = 500;
    private ActorServer server;
    private boolean secured;
    public static BiFunction remoteCallMapper;
    protected FSTConfiguration conf;
    protected RemoteScheduler scheduler = new RemoteScheduler();
    protected AtomicLong actorIdCount = new AtomicLong(0L);
    protected ConcurrentHashMap<Long, Object> publishedActorMapping = new ConcurrentHashMap();
    protected ConcurrentHashMap<Object, Long> publishedActorMappingReverse = new ConcurrentHashMap();
    protected ConcurrentLinkedQueue<Actor> remoteActors = new ConcurrentLinkedQueue();
    protected ConcurrentHashMap<Long, Actor> remoteActorSet = new ConcurrentHashMap();
    protected volatile boolean terminated = false;
    protected BiFunction<Actor, String, Boolean> remoteCallInterceptor = (actor, methodName) -> {
        Method method = actor.__getCachedMethod((String)methodName, (Actor)actor, null);
        if (method == null) {
            Log.Warn(null, "no such method on " + actor.getClass().getSimpleName() + "#" + methodName);
        }
        if (method == null || ActorProxyFactory.getInheritedAnnotation(Local.class, method) != null) {
            return false;
        }
        if (this.secured && ActorProxyFactory.getInheritedAnnotation(Remoted.class, method) == null) {
            Log.Warn(null, "method not @Remoted " + actor.getClass().getSimpleName() + "#" + methodName);
            return false;
        }
        return true;
    };
    protected Consumer<Actor> disconnectHandler;
    protected boolean isObsolete;
    private Actor facadeActor;

    public static void registerDefaultClassMappings(FSTConfiguration conf) {
        conf.registerCrossPlatformClassMapping((String[][])new String[][]{{"call", RemoteCallEntry.class.getName()}, {"cbw", CallbackWrapper.class.getName()}});
    }

    public RemoteRegistry(FSTConfiguration conf, Coding coding) {
        this.conf = conf;
        this.configureSerialization(coding);
    }

    public RemoteRegistry(Coding code) {
        if (code == null) {
            code = new Coding(SerializerType.FSTSer);
        }
        this.conf = code.createConf();
        RemoteRegistry.registerDefaultClassMappings(this.conf);
        this.configureSerialization(code);
    }

    public BiFunction<Actor, String, Boolean> getRemoteCallInterceptor() {
        return this.remoteCallInterceptor;
    }

    protected void configureSerialization(Coding code) {
        this.conf.registerSerializer(Actor.class, (FSTObjectSerializer)new ActorRefSerializer(this), true);
        this.conf.registerSerializer(CallbackWrapper.class, (FSTObjectSerializer)new CallbackRefSerializer(this), true);
        this.conf.registerSerializer(Spore.class, (FSTObjectSerializer)new SporeRefSerializer(), true);
        this.conf.registerSerializer(Timeout.class, (FSTObjectSerializer)new TimeoutSerializer(), false);
        this.conf.registerClass(new Class[]{RemoteCallEntry.class});
        this.conf.registerClass(new Class[]{Spore.class});
        this.conf.registerClass(new Class[]{CallbackWrapper.class});
        this.conf.registerClass(new Class[]{Actor.class});
    }

    public Actor getPublishedActor(long id) {
        return (Actor)this.publishedActorMapping.get(id);
    }

    public Callback getPublishedCallback(long id) {
        return (Callback)this.publishedActorMapping.get(id);
    }

    public RemoteScheduler getScheduler() {
        return this.scheduler;
    }

    public ConcurrentLinkedQueue<Actor> getRemoteActors() {
        return this.remoteActors;
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public void setTerminated(boolean terminated) {
        this.terminated = terminated;
    }

    public long publishActor(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer == null) {
            integer = this.newActId();
            this.publishActorDirect(integer, act);
        }
        return integer;
    }

    private long newActId() {
        long id = this.actorIdCount.incrementAndGet();
        return id;
    }

    private void publishActorDirect(Long id, Actor act) {
        Object o = this.publishedActorMapping.get(id);
        if (o != null && o != act.getActorRef()) {
            Log.Error((Object)this, "id already present old:" + o + " new:" + act);
        }
        this.publishedActorMapping.put(id, act.getActorRef());
        this.publishedActorMappingReverse.put(act.getActorRef(), id);
        act.__addRemoteConnection(this);
    }

    @Override
    public void unpublishActor(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        if (integer != null) {
            Log.Debug((Object)this, "" + act.getClass().getSimpleName() + " unpublished");
            this.publishedActorMapping.remove(integer);
            this.publishedActorMappingReverse.remove(act.getActorRef());
            act.__removeRemoteConnection(this);
            if (act instanceof RemotedActor) {
                ((RemotedActor)((Object)act)).hasBeenUnpublished();
            }
        }
    }

    public long registerPublishedCallback(Callback cb) {
        Long integer = this.publishedActorMappingReverse.get(cb);
        if (integer == null) {
            integer = this.newActId();
            this.publishedActorMapping.put(integer, cb);
            this.publishedActorMappingReverse.put(cb, integer);
        }
        return integer;
    }

    public void removePublishedObject(long receiverKey) {
        Object remove = this.publishedActorMapping.remove(receiverKey);
        if (remove != null) {
            this.publishedActorMappingReverse.remove(remove);
            boolean bl = true;
        } else {
            Log.Warn((Object)this, "MISS REMOVE:" + receiverKey);
        }
    }

    public void registerRemoteRefDirect(Actor act) {
        act = act.getActorRef();
        this.remoteActorSet.put(act.__remoteId, act);
        this.remoteActors.add(act);
        act.__clientConnection = this;
        act.__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
    }

    public Actor registerRemoteActorRef(Class actorClazz, long remoteId, Object client) {
        Actor actorRef = this.remoteActorSet.get(remoteId);
        if (actorRef == null) {
            Object res = Actors.AsActor(actorClazz, this.getScheduler());
            ((Actor)res).__remoteId = remoteId;
            this.remoteActorSet.put(remoteId, (Actor)res);
            this.remoteActors.add((Actor)res);
            ((Actor)res).__addStopHandler((actor, err) -> this.remoteRefStopped((Actor)actor));
            ((Actor)res).__clientConnection = this;
            return res;
        }
        return actorRef;
    }

    protected void remoteRefStopped(Actor actor) {
        this.removeRemoteActor(actor);
        actor.getActorRef().__stopped = true;
        ((Actor)actor.getActor()).__stopped = true;
    }

    public void stopRemoteRefs() {
        new ArrayList<Actor>(this.remoteActors).forEach(actor -> {
            if (this.disconnectHandler != null) {
                this.disconnectHandler.accept((Actor)actor);
            }
            try {
                this.removeRemoteActor((Actor)actor);
            }
            catch (Exception e) {
                Log.Warn((Object)this, e);
            }
            actor.getActorRef().__stopped = true;
            Object tmp = actor.getActor();
            if (tmp != null) {
                ((Actor)tmp).__stopped = true;
            }
        });
    }

    protected void removeRemoteActor(Actor act) {
        this.remoteActorSet.remove(act.__remoteId);
        this.remoteActors.remove(act);
        try {
            act.__stop();
        }
        catch (InternalActorStoppedException internalActorStoppedException) {
            // empty catch block
        }
    }

    public boolean receiveObject(ObjectSocket responseChannel, ObjectSink receiver, Object response, List<IPromise> createdFutures, Object authContext) throws Exception {
        if (response == OUT_OF_ORDER_SEQ) {
            Log.Warn((Object)this, "out of sequence remote call received");
            return false;
        }
        if (response instanceof Object[]) {
            Object[] arr = (Object[])response;
            boolean hadResp = false;
            int max = arr.length - 1;
            int inSequence = 0;
            if (!(arr[max] instanceof Number)) {
                ++max;
            } else {
                inSequence = ((Number)arr[max]).intValue();
            }
            for (int i = 0; i < max; ++i) {
                Object resp = arr[i];
                if (!(resp instanceof RemoteCallEntry)) {
                    if (resp != null && !"SP".equals(resp)) {
                        Log.Lg.error(this, null, "unexpected response:" + resp);
                    }
                    hadResp = true;
                    continue;
                }
                try {
                    if (!this.processRemoteCallEntry(responseChannel, (RemoteCallEntry)resp, createdFutures, authContext)) continue;
                    hadResp = true;
                    continue;
                }
                catch (UnknownActorException uae) {
                    responseChannel.writeObject("Unknown actor id " + ((RemoteCallEntry)resp).getReceiverKey());
                    hadResp = true;
                }
            }
            return hadResp;
        }
        if (!(response instanceof RemoteCallEntry)) {
            if (response != null && !"SP".equals(response)) {
                Log.Lg.error(this, null, "unexpected response:" + response);
            }
            return true;
        }
        try {
            if (this.processRemoteCallEntry(responseChannel, (RemoteCallEntry)response, createdFutures, authContext)) {
                return true;
            }
        }
        catch (UnknownActorException uae) {
            responseChannel.writeObject("Unknown actor id " + ((RemoteCallEntry)response).getReceiverKey());
            return true;
        }
        return false;
    }

    protected boolean processRemoteCallEntry(ObjectSocket objSocket, RemoteCallEntry response, List<IPromise> createdFutures, Object authContext) throws Exception {
        RemoteCallEntry read = response;
        if (read.getQueue() == 0) {
            Actor targetActor;
            if (remoteCallMapper != null) {
                read = (RemoteCallEntry)remoteCallMapper.apply(this, read);
            }
            if ((targetActor = this.getPublishedActor(read.getReceiverKey())) == null && this.facadeActor instanceof SessionResurrector) {
                try {
                    SessionResurrector actorRef = (SessionResurrector)((Object)this.facadeActor.getActorRef());
                    targetActor = actorRef.reanimate(objSocket.getConnectionIdentifier(), read.getReceiverKey()).await();
                    if (targetActor != null) {
                        this.publishActorDirect(read.getReceiverKey(), targetActor);
                    }
                }
                catch (Throwable th) {
                    Log.Info((Object)this, th);
                }
            }
            if (targetActor == null) {
                Log.Lg.error(this, null, "registry:" + System.identityHashCode(this) + " no actor found for key " + read);
                throw new UnknownActorException("unknown actor id " + read.getReceiverKey());
            }
            targetActor.__dispatchRemoteCall(objSocket, read, this, createdFutures, authContext, this.remoteCallInterceptor);
        } else if (read.getQueue() == 1) {
            Callback publishedCallback;
            if (remoteCallMapper != null) {
                read = (RemoteCallEntry)remoteCallMapper.apply(this, read);
            }
            if ((publishedCallback = this.getPublishedCallback(read.getReceiverKey())) == null) {
                publishedCallback = this.getPublishedCallback(-read.getReceiverKey());
                if (publishedCallback != null) {
                    publishedCallback.complete(read, null);
                    return false;
                }
                if (read.getArgs() != null && read.getArgs().length == 2 && read.getArgs()[1] instanceof InternalActorStoppedException) {
                    Log.Warn((Object)this, "call to stopped remote actor");
                } else {
                    Log.Warn((Object)this, "Publisher already deregistered, set error to 'Actor.CONT' in order to signal more messages will be sent. " + read);
                }
            } else {
                boolean isContinue = read.isContinue();
                read.unpackArgs(this.conf);
                if (isContinue) {
                    read.getArgs()[1] = "CNT";
                }
                publishedCallback.complete(read.getArgs()[0], read.getArgs()[1]);
                if (!isContinue) {
                    this.removePublishedObject(read.getReceiverKey());
                }
            }
        }
        return createdFutures != null && createdFutures.size() > 0;
    }

    public void cleanUp() {
        this.conf.clearCaches();
        this.stopRemoteRefs();
        ((ConcurrentHashMap.KeySetView)this.publishedActorMappingReverse.keySet()).forEach(act -> {
            if (act instanceof Actor) {
                this.unpublishActor((Actor)act);
            }
        });
        this.getFacadeProxy().__removeRemoteConnection(this);
    }

    public void disconnect() {
        this.setTerminated(true);
        this.cleanUp();
    }

    protected void closeRef(CallEntry ce, ObjectSocket chan) throws IOException {
        if (ce.getTargetActor().getActorRef() == this.getFacadeProxy().getActorRef()) {
            chan.close();
        } else {
            this.removeRemoteActor(ce.getTargetActor());
        }
    }

    protected void writeObject(ObjectSocket chan, RemoteCallEntry rce) throws Exception {
        try {
            chan.writeObject(rce);
        }
        catch (Exception e) {
            Log.Debug((Object)this, "a connection closed '" + e.getMessage() + "', terminating registry");
            this.disconnect();
        }
    }

    public void receiveCBResult(ObjectSocket chan, long id, Object result, Object error) throws Exception {
        if (this.facadeActor != null) {
            DispatcherThread debug = this.facadeActor.getCurrentDispatcher();
            if (Thread.currentThread() != this.facadeActor.getCurrentDispatcher()) {
                this.facadeActor.execute(() -> {
                    try {
                        if (Thread.currentThread() != debug) {
                            System.out.println("??");
                        }
                        this.receiveCBResult(chan, id, result, error);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                return;
            }
        }
        RemoteCallEntry rce = new RemoteCallEntry(0L, id, null, null, this.conf.asByteArray((Object)new Object[]{result, error}));
        rce.setQueue(1);
        rce.setContinue(error == "CNT");
        this.writeObject(chan, rce);
    }

    @Override
    public void close() {
        try {
            this.getWriteObjectSocket().get().flush();
        }
        catch (Exception e) {
            Log.Warn((Object)this, e);
        }
        this.cleanUp();
    }

    public FSTConfiguration getConf() {
        return this.conf;
    }

    public abstract Actor getFacadeProxy();

    public void setDisconnectHandler(Consumer<Actor> disconnectHandler) {
        this.disconnectHandler = disconnectHandler;
    }

    public Consumer<Actor> getDisconnectHandler() {
        return this.disconnectHandler;
    }

    @Override
    public void setClassLoader(ClassLoader l) {
        this.conf.setClassLoader(l);
    }

    @Override
    public long getRemoteId(Actor act) {
        Long integer = this.publishedActorMappingReverse.get(act.getActorRef());
        return integer == null ? -1L : integer;
    }

    public boolean pollAndSend2Remote(AtomicReference<ObjectSocket> chanHolder) throws Exception {
        int sumQueued;
        ObjectSocket chan = chanHolder.get();
        if (chan == null || !chan.canWrite()) {
            return false;
        }
        boolean hadAnyMsg = false;
        ArrayList<Actor> toRemove = null;
        int fullqueued = 0;
        do {
            sumQueued = 0;
            for (Actor remoteActor : this.remoteActors) {
                boolean cb = false;
                CallEntry ce = (CallEntry)remoteActor.__cbQueue.poll();
                if (ce == null) {
                    cb = false;
                    ce = (CallEntry)remoteActor.__mailbox.poll();
                }
                if (ce == null) continue;
                if (ce.getMethod().getName().equals("close")) {
                    this.closeRef(ce, chan);
                    continue;
                }
                if (ce.getMethod().getName().equals("asyncstop")) {
                    Log.Lg.error(this, null, "cannot stop remote actors");
                    continue;
                }
                long futId = 0L;
                if (ce.hasFutureResult()) {
                    futId = this.registerPublishedCallback(ce.getFutureCB());
                }
                try {
                    RemoteCallEntry rce = new RemoteCallEntry(futId, remoteActor.__remoteId, ce.getMethod().getName(), ce.getArgs(), null);
                    rce.setQueue(cb ? 1 : 0);
                    rce.pack(this.conf);
                    this.writeObject(chan, rce);
                    ++sumQueued;
                    hadAnyMsg = true;
                }
                catch (Throwable ex) {
                    if (ex instanceof InvocationTargetException && ((InvocationTargetException)ex).getTargetException() != null) {
                        ex = ((InvocationTargetException)ex).getTargetException();
                    }
                    if (ex instanceof IOError || ex instanceof IOException) {
                        chan.setLastError(ex);
                        if (toRemove == null) {
                            toRemove = new ArrayList<Actor>();
                        }
                        toRemove.add(remoteActor);
                        remoteActor.__stop();
                        Log.Lg.infoLong(this, ex, "connection closed");
                        break;
                    }
                    Log.Error((Object)this, ex);
                    break;
                }
            }
            if (toRemove == null) continue;
            toRemove.forEach(act -> this.removeRemoteActor((Actor)act));
        } while (sumQueued > 0 && (fullqueued += sumQueued) < MAX_BATCH_CALLS);
        chan.flush();
        return hadAnyMsg;
    }

    public void forwardRemoteMessage(RemoteCallEntry rce) {
        try {
            ObjectSocket chan = this.getWriteObjectSocket().get();
            this.writeObject(chan, rce);
            chan.flush();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public abstract AtomicReference<ObjectSocket> getWriteObjectSocket();

    @Override
    public ObjectSocket getSocketRef() {
        return this.getWriteObjectSocket().get();
    }

    public boolean isObsolete() {
        return this.isObsolete;
    }

    public void setIsObsolete(boolean isObsolete) {
        this.isObsolete = isObsolete;
    }

    public int getRemoteActorSize() {
        return this.remoteActorSet.size();
    }

    public void setFacadeActor(Actor facadeActor) {
        this.facadeActor = facadeActor;
        if (facadeActor.getActor().getClass().getAnnotation(Secured.class) != null) {
            this.secured = true;
        }
    }

    public Actor getFacadeActor() {
        return this.facadeActor;
    }

    public void setServer(ActorServer server) {
        this.server = server;
    }

    public ActorServer getServer() {
        return this.server;
    }

    @Override
    public IPromise closeNetwork() {
        if (this.server != null) {
            return this.server.close();
        }
        Log.Warn(null, "failed closing underlying network connection as server is null");
        return new Promise<Object>(null, "server is null");
    }
}

