/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.remoting.base.ConnectableActor;
import org.nustaq.kontraktor.remoting.base.ReconnectableRemoteRef;
import org.nustaq.kontraktor.remoting.base.ServiceDescription;
import org.nustaq.kontraktor.remoting.tcp.TCPConnectable;
import org.nustaq.kontraktor.remoting.tcp.TCPNIOPublisher;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.ServiceArgs;
import org.nustaq.kontraktor.services.ServiceRegistry;
import org.nustaq.kontraktor.services.datacluster.DataShard;
import org.nustaq.kontraktor.services.datacluster.dynamic.DynDataServiceRegistry;
import org.nustaq.kontraktor.services.datacluster.dynamic.DynDataShard;
import org.nustaq.kontraktor.services.rlclient.DataClient;
import org.nustaq.kontraktor.services.rlclient.dynamic.DynDataClient;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.reallive.server.actors.DynTableSpaceActor;
import org.nustaq.reallive.server.actors.TableSpaceActor;
import org.nustaq.reallive.server.dynamic.DynClusterDistribution;
import org.nustaq.serialization.util.FSTUtil;

public abstract class ServiceActor<T extends ServiceActor>
extends Actor<T> {
    public static final String REGISTRY_DISCONNECTED = "registry disconnected";
    public static final String REGISTRY_RECONNECTED = "registry reconnected";
    public static int DEFAULT_START_TIMEOUT = 600000;
    public static final String UNCONNECTED = "UNCONNECTED";
    protected ReconnectableRemoteRef<ServiceRegistry> serviceRegistry;
    protected Map<String, Object> requiredServices;
    protected ClusterCfg config;
    protected ServiceDescription serviceDescription;
    protected ServiceArgs cmdline;
    protected DataClient dclient;
    protected DynClusterDistribution currentDistribution;
    IPromise initComplete;
    List<BiConsumer<String, Object>> serviceEventListener;

    public static ServiceActor RunTCP(String[] args, Class<? extends ServiceActor> serviceClazz, Class<? extends ServiceArgs> argsClazz) {
        return ServiceActor.RunTCP(args, serviceClazz, argsClazz, (long)DEFAULT_START_TIMEOUT);
    }

    public static ServiceActor RunTCP(String[] args, Class<? extends ServiceActor> serviceClazz, Class<? extends ServiceArgs> argsClazz, Class<? extends ServiceRegistry> serviceRegistryClass) {
        return ServiceActor.RunTCP(args, serviceClazz, argsClazz, serviceRegistryClass, DEFAULT_START_TIMEOUT);
    }

    public static ServiceActor RunTCP(String[] args, Class<? extends ServiceActor> serviceClazz, Class<? extends ServiceArgs> argsClazz, long timeout) {
        return ServiceActor.RunTCP(args, serviceClazz, argsClazz, ServiceRegistry.class, timeout);
    }

    public static ServiceActor RunTCP(String[] args, Class<? extends ServiceActor> serviceClazz, Class<? extends ServiceArgs> argsClazz, Class<? extends ServiceRegistry> serviceRegistryClass, long timeout) {
        ServiceArgs options = null;
        try {
            options = ServiceRegistry.parseCommandLine(args, null, argsClazz.newInstance());
        }
        catch (Exception e) {
            FSTUtil.rethrow((Throwable)e);
        }
        return ServiceActor.RunTCP(options, serviceClazz, serviceRegistryClass, timeout);
    }

    public static ServiceActor RunTCP(ServiceArgs options, Class<? extends ServiceActor> serviceClazz, long timeout) {
        return ServiceActor.RunTCP(options, serviceClazz, ServiceRegistry.class, timeout);
    }

    public static ServiceActor RunTCP(ServiceArgs options, Class<? extends ServiceActor> serviceClazz, Class<? extends ServiceRegistry> serviceRegistryClass, long timeout) {
        ServiceActor myService = (ServiceActor)ServiceActor.AsActor(serviceClazz);
        TCPConnectable connectable = new TCPConnectable(serviceRegistryClass, options.getRegistryHost(), options.getRegistryPort());
        myService.init((ConnectableActor)connectable, options, true).await(timeout);
        Log.Info(((Object)((Object)myService)).getClass(), (String)"Init finished");
        return myService;
    }

    public IPromise init(ConnectableActor registryConnectable, ServiceArgs options, final boolean autoRegister) {
        this.initComplete = new Promise();
        this.cmdline = options;
        this.serviceEventListener = new ArrayList<BiConsumer<String, Object>>();
        if (!options.isAsyncLog()) {
            Log.SetSynchronous();
        }
        Log.Info((Object)((Object)this), (String)("startup options " + options));
        Log.Info((Object)((Object)this), (String)"connecting to serviceRegistry ..");
        this.serviceRegistry = new ReconnectableRemoteRef(registryConnectable, new ReconnectableRemoteRef.ReconnectableListener(){

            public void remoteDisconnected(Actor disconnected) {
                ServiceActor.this.execute(() -> ServiceActor.this.onRegistryDisconnected());
            }

            public void remoteConnected(Actor connected) {
                ServiceActor.this.execute(() -> ServiceActor.this.onRegistryConnected(autoRegister));
            }
        });
        return this.initComplete;
    }

    public void addServiceEventListener(BiConsumer<String, Object> l) {
        if (!this.serviceEventListener.contains(l)) {
            this.serviceEventListener.add(l);
        }
    }

    public void removeServiceEventListener(BiConsumer<String, Object> l) {
        this.serviceEventListener.remove(l);
    }

    @CallerSideMethod
    public ServiceRegistry getServiceRegistry() {
        return (ServiceRegistry)((ServiceActor)this.getActor()).serviceRegistry.get();
    }

    protected void fireServiceEvent(String ev, Object arg) {
        this.serviceEventListener.forEach(con -> con.accept(ev, arg));
    }

    protected void onRegistryDisconnected() {
        this.fireServiceEvent(REGISTRY_DISCONNECTED, null);
    }

    protected void onRegistryConnected(boolean autoRegister) {
        Log.Info((Object)((Object)this), (String)"connected serviceRegistry.");
        this.config = (ClusterCfg)((ServiceRegistry)this.serviceRegistry.get()).getConfig().await();
        boolean isReconnect = this.initComplete.isSettled();
        if (isReconnect) {
            this.onServiceRegistryReconnected();
            this.registerSelf();
        } else {
            Log.Info((Object)((Object)this), (String)"loaded cluster configuration");
            this.requiredServices = new HashMap<String, Object>();
            Arrays.stream(this.getAllServiceNames()).forEach(sname -> this.requiredServices.put((String)sname, UNCONNECTED));
            Log.Info((Object)((Object)this), (String)"waiting for required services ..");
            this.awaitRequiredServices().then((Callback & Serializable)(r, e) -> {
                if (e == null) {
                    if (this.isFixedDataCluster()) {
                        this.initRealLiveFixed();
                    } else if (this.isDynamicDataCluster()) {
                        this.initRealLiveDynamic();
                    }
                    Log.Info((Object)((Object)this), (String)"got all required services ..");
                    if (autoRegister) {
                        this.registerSelf();
                    }
                    this.initComplete.resolve();
                } else {
                    Log.Warn((Object)((Object)this), (String)("missing services " + e));
                }
            });
        }
    }

    protected void onServiceRegistryReconnected() {
        this.fireServiceEvent(REGISTRY_RECONNECTED, null);
        Log.Info((Object)((Object)this), (String)"service registry reconnected.");
    }

    protected IPromise awaitRequiredServices() {
        Promise p = new Promise();
        Log.Info((Object)((Object)this), (String)"connecting required services ..");
        this.awaitRequiredServicesInternal(p);
        return p;
    }

    protected void awaitRequiredServicesInternal(Promise p) {
        this.connectRequiredServices().then(() -> {
            block8: {
                long missing = this.requiredServices.values().stream().filter(serv -> serv == UNCONNECTED).count();
                if (missing > 0L) {
                    Log.Warn((Object)((Object)this), (String)"missing: ");
                    this.requiredServices.forEach((name, serv) -> {
                        if (serv == UNCONNECTED) {
                            Log.Warn((Object)((Object)this), (String)("    " + name));
                        }
                    });
                    this.delayed(2000L, () -> this.awaitRequiredServicesInternal(p));
                } else if (this.isDynamicDataCluster()) {
                    ServiceRegistry serviceRegistry = (ServiceRegistry)this.serviceRegistry.get();
                    if (!(serviceRegistry instanceof DynDataServiceRegistry)) {
                        Log.Error((Object)((Object)this), (String)"Fatal: need DynDataServiceRegistry to manage dynamic data cluster");
                        this.delayed(1000L, () -> System.exit(2));
                    }
                    DynDataServiceRegistry reg = (DynDataServiceRegistry)serviceRegistry;
                    try {
                        DynClusterDistribution distribution = (DynClusterDistribution)reg.getActiveDistribution().await();
                        if (distribution != null) {
                            Log.Info((Object)((Object)this), (String)"received distribution, start initializing dataclient ");
                            serviceRegistry.getServiceMap().then((Callback & Serializable)(smap, err) -> {
                                if (smap != null) {
                                    ArrayList proms = new ArrayList();
                                    smap.values().stream().filter(desc -> desc.getName().startsWith("DynShard")).forEach(desc -> {
                                        Promise sp = new Promise();
                                        proms.add(sp);
                                        this.connectService((ServiceDescription)desc).then((Callback & Serializable)(r, e) -> {
                                            if (r != null) {
                                                Log.Info((Object)((Object)this), (String)("dyndatacluster init connecting " + desc));
                                                this.requiredServices.put(desc.getName(), r);
                                                sp.resolve();
                                            } else {
                                                Log.Error((Object)((Object)this), (String)("failed to connect " + desc));
                                                sp.reject(e);
                                            }
                                        });
                                    });
                                    ServiceActor.allMapped(proms).await();
                                    this.setCurrentDistribution(distribution);
                                    p.resolve();
                                } else {
                                    p.reject((Object)"could not aquire servicemap");
                                }
                            });
                            break block8;
                        }
                        Log.Info((Object)((Object)this), (String)"wait for distribution map ..");
                        this.delayed(2000L, () -> this.awaitRequiredServicesInternal(p));
                    }
                    catch (Exception e) {
                        Log.Error((Object)((Object)this), (Throwable)e);
                    }
                } else {
                    p.resolve();
                }
            }
        });
    }

    private void setCurrentDistribution(DynClusterDistribution distribution) {
        this.currentDistribution = distribution;
    }

    protected void initRealLiveDynamic() {
        Log.Info((Object)((Object)this), (String)"init datacluster client");
        int nShards = this.currentDistribution.getNumberOfShards();
        Log.Info((Object)((Object)this), (String)("number of shards " + nShards));
        DynDataShard[] shards = new DynDataShard[nShards];
        DynTableSpaceActor[] tsShard = new DynTableSpaceActor[nShards];
        Map serviceMap = (Map)((ServiceRegistry)this.serviceRegistry.get()).getServiceMap().await();
        int i = 0;
        for (String serviceName : serviceMap.keySet()) {
            if (!serviceName.startsWith("DynShard")) continue;
            shards[i] = (DynDataShard)((Object)this.getService(serviceName));
            if (shards[i] == null) {
                Log.Error((Object)((Object)this), (String)("FATAL: announced shard not found/connected:" + serviceName));
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.exit(1);
            } else {
                Log.Info((Object)((Object)this), (String)("connect to shard " + serviceName));
                tsShard[i] = (DynTableSpaceActor)shards[i].getTableSpace().await();
                tsShard[i].__clientsideTag = serviceName;
            }
            ++i;
        }
        if (i != nShards) {
            Log.Error((Object)((Object)this), (String)"FATAL: number dyndatashards contradicts distribution");
            this.delayed(1000L, () -> System.exit(1));
        }
        Log.Info((Object)((Object)this), (String)"dc connected all shards");
        this.dclient = (DataClient)Actors.AsActor(DynDataClient.class);
        ((DynDataClient)this.dclient).setInitialMapping(this.currentDistribution);
        this.dclient.connect(this.config.getDataCluster(), (TableSpaceActor[])tsShard, (ServiceActor)this.self()).await((long)DEFAULT_START_TIMEOUT);
        Log.Info((Object)((Object)this), (String)"dc init done");
        Log.Info((Object)((Object)this), (String)("\n" + this.currentDistribution));
    }

    protected void initRealLiveFixed() {
        Log.Info((Object)((Object)this), (String)"init datacluster client");
        int nShards = this.config.getDataCluster().getNumberOfShards();
        Log.Info((Object)((Object)this), (String)("number of shards " + nShards));
        DataShard[] shards = new DataShard[nShards];
        TableSpaceActor[] tsShard = new TableSpaceActor[nShards];
        for (int i = 0; i < nShards; ++i) {
            shards[i] = (DataShard)((Object)this.getService("DataShard" + i));
            Log.Info((Object)((Object)this), (String)("connect to shard " + i));
            tsShard[i] = (TableSpaceActor)shards[i].getTableSpace().await();
            tsShard[i].__clientsideTag = "DataShard" + i;
        }
        Log.Info((Object)((Object)this), (String)"dc connected all shards");
        this.dclient = (DataClient)Actors.AsActor(DataClient.class);
        this.dclient.connect(this.config.getDataCluster(), tsShard, (ServiceActor)this.self()).await((long)DEFAULT_START_TIMEOUT);
        Log.Info((Object)((Object)this), (String)"dc init done");
    }

    public IPromise<ClusterCfg> getConfig() {
        return ServiceActor.resolve((Object)this.config);
    }

    public IPromise<DataClient> getDataClient() {
        return ServiceActor.resolve((Object)((Object)this.dclient));
    }

    protected void registerSelf() {
        this.publishSelf();
        ((ServiceRegistry)this.serviceRegistry.get()).registerService(this.getServiceDescription());
        ((ServiceRegistry)this.serviceRegistry.get()).subscribe((Callback<Pair<String, ServiceDescription>>)(Callback & Serializable)(pair, err) -> this.serviceEvent((String)pair.car(), pair.cdr(), err));
        this.heartBeat();
        Log.Info((Object)((Object)this), (String)"registered at serviceRegistry.");
    }

    protected void publishSelf() {
        Log.Info((Object)((Object)this), (String)("registering at service registry " + this.getServiceDescription().getName()));
        int defaultPort = this.getPort();
        if (defaultPort <= 0) {
            Log.Warn((Object)((Object)this), (String)("Service " + this.getServiceDescription().getName() + " has no port and host configured. Unpublished."));
            return;
        }
        Log.Info((Object)((Object)this), (String)("publishing self at " + defaultPort));
        new TCPNIOPublisher(this.self(), defaultPort).publish(actor -> Log.Info(null, (String)(actor + " has disconnected")));
    }

    protected int getPort() {
        return -1;
    }

    protected ServiceArgs getCmdline() {
        return this.cmdline;
    }

    protected String[] getAllServiceNames() {
        if (this.isFixedDataCluster()) {
            String[] rn = this.getRequiredServiceNames();
            int numberOfShards = this.config.getDataCluster().getNumberOfShards();
            String[] s = Arrays.copyOf(rn, rn.length + numberOfShards);
            for (int i = 0; i < numberOfShards; ++i) {
                s[i + rn.length] = "DataShard" + i;
            }
            return s;
        }
        return this.getRequiredServiceNames();
    }

    protected boolean isFixedDataCluster() {
        return !this.isDynamicDataCluster();
    }

    protected boolean isDynamicDataCluster() {
        return false;
    }

    protected abstract String[] getRequiredServiceNames();

    protected void serviceEvent(String eventId, Object cdr, Object err) {
        if (cdr != null && "timeout".equals(eventId) && this.requiredServices.containsKey(((ServiceDescription)cdr).getName())) {
            this.requiredSerivceWentDown((ServiceDescription)cdr);
        }
        if ("configupdate".equals(eventId)) {
            this.config = (ClusterCfg)cdr;
            this.notifyConfigChanged();
        }
        if ("distribution".equals(eventId)) {
            this.setCurrentDistribution((DynClusterDistribution)cdr);
        }
        this.fireServiceEvent(eventId, cdr);
    }

    protected void notifyConfigChanged() {
    }

    protected void requiredSerivceWentDown(ServiceDescription cdr) {
        Log.Error((Object)((Object)this), (String)("required service went down. Shutting down. :" + cdr));
        ((ServiceActor)this.self()).stop();
    }

    protected <T extends Actor> T getService(String name) {
        Object service = this.requiredServices.get(name);
        if (service == UNCONNECTED || service == null) {
            return null;
        }
        return (T)((Actor)service);
    }

    public IPromise connectRequiredServices() {
        if (this.requiredServices.size() == 0) {
            return ServiceActor.resolve();
        }
        Promise res = new Promise();
        ((ServiceRegistry)this.serviceRegistry.get()).getServiceMap().then((arg_0, arg_1) -> this.lambda$connectRequiredServices$571da9$1((IPromise)res, arg_0, arg_1));
        return res;
    }

    protected IPromise<Actor> connectService(ServiceDescription serviceDescription) {
        return serviceDescription.getConnectable().connect(null, act -> this.serviceDisconnected((Actor)act));
    }

    protected void serviceDisconnected(Actor act) {
        Log.Warn((Object)((Object)this), (String)("a remote service disconnected " + act));
        this.dclient.nodeDisconnected(act);
    }

    @Local
    public void heartBeat() {
        if (this.isStopped()) {
            return;
        }
        if (this.serviceRegistry.isOnline()) {
            ServiceDescription sd = this.getServiceDescription();
            ((ServiceRegistry)this.serviceRegistry.get()).receiveHeartbeatWithStatus(sd.getName(), sd.getUniqueKey(), this.getStatus());
            this.delayed(1000L, () -> this.heartBeat());
        }
    }

    protected Serializable getStatus() {
        return null;
    }

    protected void gravityDisconnected() {
        this.serviceRegistry = null;
    }

    protected abstract ServiceDescription createServiceDescription();

    protected ServiceDescription getServiceDescription() {
        if (this.serviceDescription == null) {
            this.serviceDescription = this.createServiceDescription();
        }
        return this.serviceDescription;
    }

    private /* synthetic */ void lambda$connectRequiredServices$571da9$1(IPromise res, Map smap, Object err) {
        ArrayList<Promise> servicePromis = new ArrayList<Promise>();
        String[] servNames = this.getAllServiceNames();
        for (int i = 0; i < servNames.length; ++i) {
            IPromise<Actor> connect;
            String servName = servNames[i];
            ServiceDescription serviceDescription = (ServiceDescription)smap.get(servName);
            if (serviceDescription == null || this.requiredServices.get(servName) instanceof Actor) continue;
            if (serviceDescription.getConnectable() == null) {
                Log.Error((Object)((Object)this), (String)("No connecteable defined for service " + serviceDescription.getName()));
            }
            try {
                Log.Info((Object)((Object)this), (String)("connect " + serviceDescription.getConnectable()));
                connect = this.connectService(serviceDescription);
            }
            catch (Throwable th) {
                Log.Error((Object)((Object)this), (Throwable)th, (String)("failed to connect " + serviceDescription.getName()));
                continue;
            }
            Promise notify = new Promise();
            servicePromis.add(notify);
            connect.then((Callback & Serializable)(actor, connectionError) -> {
                if (actor != null) {
                    this.requiredServices.put(servName, actor);
                    Log.Info((Object)((Object)this), (String)("connected required service " + servName));
                    notify.complete();
                } else {
                    this.requiredServices.put(servName, UNCONNECTED);
                    Log.Info((Object)((Object)this), (String)("connected requireed service " + servName));
                    Log.Warn((Object)((Object)this), (String)("failed to connect " + servName + " " + connectionError + " " + serviceDescription.getConnectable()));
                    notify.reject((Object)("failed to connect " + servName + " " + connectionError));
                }
            });
        }
        ServiceActor.all(servicePromis).then((Callback)res);
    }
}

