/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services;

import com.beust.jcommander.JCommander;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.remoting.encoding.SerializerType;
import org.nustaq.kontraktor.remoting.http.undertow.Http4K;
import org.nustaq.kontraktor.remoting.tcp.TCPConnectable;
import org.nustaq.kontraktor.remoting.tcp.TCPNIOPublisher;
import org.nustaq.kontraktor.remoting.websockets.WebSocketConnectable;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.RegistryArgs;
import org.nustaq.kontraktor.services.ServiceArgs;
import org.nustaq.kontraktor.services.ServiceDescription;
import org.nustaq.kontraktor.services.rlclient.DataShard;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.serialization.FSTConfiguration;

public class ServiceRegistry
extends Actor<ServiceRegistry> {
    public static final String CONFIGUPDATE = "configupdate";
    public static final String SERVICEDUMP = "running";
    public static final String AVAILABLE = "available";
    public static final String TIMEOUT = "timeout";
    HashMap<String, List<ServiceDescription>> services;
    Map<String, StatusEntry> statusMap;
    List<Callback> listeners;
    ClusterCfg config;
    public static RegistryArgs options;

    @Local
    public void init() {
        this.services = new HashMap();
        this.statusMap = new HashMap<String, StatusEntry>();
        this.listeners = new ArrayList<Callback>();
        this.checkTimeout();
        this.config = ClusterCfg.read();
        this.serviceDumper();
    }

    public void serviceDumper() {
        if (!this.isStopped()) {
            try {
                Log.Info((Object)((Object)this), (String)"------");
                this.services.forEach((k, sd) -> Log.Info((Object)((Object)this), (String)("" + sd)));
                Log.Info((Object)((Object)this), (String)"------");
                this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)SERVICEDUMP, this.services)));
                if (ClusterCfg.isDirty()) {
                    this.config = ClusterCfg.read();
                    this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)CONFIGUPDATE, (Object)this.config)));
                }
            }
            catch (Exception e) {
                Log.Error((Object)((Object)this), (Throwable)e);
            }
            if (options.dumpServices()) {
                this.delayed(10000L, () -> this.serviceDumper());
            }
        }
    }

    public void registerService(ServiceDescription desc) {
        List<ServiceDescription> serviceList = this.getServiceList(desc.getName());
        serviceList.add(desc);
        desc.receiveHeartbeat();
        if (serviceList.size() == 1) {
            this.broadcastAvailable(desc);
        }
    }

    public IPromise<Map<String, ServiceDescription>> getServiceMap() {
        HashMap servMap = new HashMap();
        this.services.forEach((name, list) -> {
            if (list.size() > 0) {
                servMap.put(name, list.get(0));
            }
        });
        return ServiceRegistry.resolve(servMap);
    }

    public void subscribe(Callback<Pair<String, ServiceDescription>> cb) {
        this.listeners.add(cb);
    }

    protected void broadcastAvailable(ServiceDescription desc) {
        Pair msg = new Pair((Object)AVAILABLE, (Object)desc);
        this.listeners = this.listeners.stream().filter(cb -> !cb.isTerminated()).collect(Collectors.toList());
        this.listeners.forEach(cb -> {
            try {
                cb.pipe((Object)msg);
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
            }
        });
    }

    protected void broadCastTimeOut(ServiceDescription desc) {
        Pair msg = new Pair((Object)TIMEOUT, (Object)desc);
        for (int i = 0; i < this.listeners.size(); ++i) {
            Callback cb = this.listeners.get(i);
            try {
                cb.pipe((Object)msg);
                continue;
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
                this.listeners.remove(i);
                --i;
            }
        }
    }

    public IPromise<ClusterCfg> getConfig() {
        return ServiceRegistry.resolve((Object)this.config);
    }

    public void receiveHeartbeat(String serviceName, String uniqueKey) {
        this.receiveHeartbeatWithStatus(serviceName, uniqueKey, null);
    }

    public void receiveHeartbeatWithStatus(String serviceName, String uniqueKey, Serializable status) {
        long now = System.currentTimeMillis();
        this.getServiceList(serviceName).forEach(sdesc -> {
            if (sdesc.getUniqueKey().equals(uniqueKey)) {
                sdesc.receiveHeartbeat();
                if (status != null) {
                    this.updateStatus(now, (ServiceDescription)sdesc, uniqueKey + "#" + sdesc.getName(), status);
                }
            }
        });
    }

    protected void removeStatus(String key) {
        this.statusMap.remove(key);
    }

    protected void updateStatus(long now, ServiceDescription td, String key, Serializable status) {
        this.statusMap.put(key, new StatusEntry(now, status, key, td.getName()));
    }

    @Local
    public void checkTimeout() {
        this.services.values().forEach(list -> {
            int prevsiz = list.size();
            for (int i = 0; i < list.size(); ++i) {
                ServiceDescription serviceDescription = (ServiceDescription)list.get(i);
                if (!serviceDescription.hasTimedOut()) continue;
                list.remove(i);
                this.removeStatus(serviceDescription.getUniqueKey() + "#" + serviceDescription.getName());
                --i;
                this.broadCastTimeOut(serviceDescription);
            }
            if (prevsiz != list.size() && list.size() > 0) {
                this.broadcastAvailable((ServiceDescription)list.get(0));
            }
        });
        if (!this.isStopped()) {
            this.delayed(1000L, () -> this.checkTimeout());
        }
    }

    protected List<ServiceDescription> getServiceList(String serviceName) {
        List<ServiceDescription> slist = this.services.get(serviceName);
        if (slist == null) {
            slist = new ArrayList<ServiceDescription>();
            this.services.put(serviceName, slist);
        }
        return slist;
    }

    public static ServiceArgs parseCommandLine(String[] args, ServiceArgs options) {
        JCommander com = new JCommander();
        com.addObject((Object)options);
        try {
            com.parse(args);
        }
        catch (Exception ex) {
            System.out.println("command line error: '" + ex.getMessage() + "'");
            options.help = true;
        }
        if (options.help) {
            com.usage();
            System.exit(-1);
        }
        return options;
    }

    private IPromise<RestApi> getRest() {
        RestApi restApi = (RestApi)ServiceRegistry.AsActor(RestApi.class, (Scheduler)this.getScheduler());
        restApi.init((ServiceRegistry)this.self());
        return ServiceRegistry.resolve((Object)((Object)restApi));
    }

    public IPromise<List<StatusEntry>> getStati() {
        ArrayList res = new ArrayList();
        this.statusMap.forEach((k, v) -> res.add(v));
        res.sort((a, b) -> a.getName().compareTo(b.getName()));
        return ServiceRegistry.resolve(res);
    }

    public static void main(String[] args) {
        ServiceRegistry.start(args);
    }

    public static ServiceRegistry start(String[] args) {
        options = (RegistryArgs)ServiceRegistry.parseCommandLine(args, new RegistryArgs());
        if (!options.isAsyncLog()) {
            Log.SetSynchronous();
        }
        ServiceRegistry serviceRegistry = (ServiceRegistry)Actors.AsActor(ServiceRegistry.class);
        serviceRegistry.init();
        new TCPNIOPublisher((Actor)serviceRegistry, options.getRegistryPort()).publish(actor -> Log.Info(null, (String)(actor + " has disconnected")));
        Http4K.Build((String)options.getMonhost(), (int)options.getMonport()).restAPI("/mon", (Actor)serviceRegistry.getRest().await()).build();
        serviceRegistry.subscribe((Callback<Pair<String, ServiceDescription>>)(Callback & Serializable)(pair, err) -> Log.Info(((Object)((Object)serviceRegistry)).getClass(), (String)((String)pair.car() + " " + pair.cdr())));
        return serviceRegistry;
    }

    public static class RestApi
    extends Actor<RestApi> {
        private ServiceRegistry reg;
        FSTConfiguration jsonConfiguration;

        public void init(ServiceRegistry reg) {
            this.reg = reg;
            this.jsonConfiguration = FSTConfiguration.createJsonConfiguration((boolean)true, (boolean)false);
            this.jsonConfiguration.registerCrossPlatformClassMappingUseSimpleName(new Class[]{TableDescription.class, ServiceDescription.class, TCPConnectable.class, DataShard.class, SerializerType.class, Coding.class, WebSocketConnectable.class, Class.class, StatusEntry.class});
        }

        public IPromise getServices() {
            Promise p = new Promise();
            this.reg.getServiceMap().then((Callback & Serializable)(r, e) -> {
                if (r != null) {
                    try {
                        p.resolve((Object)new String(this.jsonConfiguration.asByteArray(r), "UTF-8"));
                    }
                    catch (UnsupportedEncodingException e1) {
                        Log.Error((Object)((Object)this), (Throwable)e1);
                        p.reject((Object)500);
                    }
                } else {
                    p.reject((Object)500);
                }
            });
            return p;
        }

        public IPromise getStati() {
            Promise p = new Promise();
            this.reg.getStati().then((Callback & Serializable)(r, e) -> {
                if (r != null) {
                    try {
                        p.resolve((Object)new String(this.jsonConfiguration.asByteArray(r), "UTF-8"));
                    }
                    catch (UnsupportedEncodingException e1) {
                        Log.Error((Object)((Object)this), (Throwable)e1);
                        p.reject((Object)500);
                    }
                } else {
                    p.reject((Object)500);
                }
            });
            return p;
        }
    }

    public static class StatusEntry
    implements Serializable {
        long time;
        Object status;
        String key;
        String name;

        public StatusEntry(long time, Object status, String key, String name) {
            this.time = time;
            this.status = status;
            this.key = key;
            this.name = name;
        }

        public long getTime() {
            return this.time;
        }

        public Object getStatus() {
            return this.status;
        }

        public String getKey() {
            return this.key;
        }

        public String getName() {
            return this.name == null ? "unnamed" : this.name;
        }
    }
}

