/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services;

import com.beust.jcommander.JCommander;
import com.eclipsesource.json.WriterConfig;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.remoting.base.ServiceDescription;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.remoting.encoding.SerializerType;
import org.nustaq.kontraktor.remoting.http.undertow.Http4K;
import org.nustaq.kontraktor.remoting.tcp.TCPConnectable;
import org.nustaq.kontraktor.remoting.tcp.TCPNIOPublisher;
import org.nustaq.kontraktor.remoting.websockets.WebSocketConnectable;
import org.nustaq.kontraktor.rest.FromQuery;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.RegistryArgs;
import org.nustaq.kontraktor.services.ServiceArgs;
import org.nustaq.kontraktor.services.datacluster.DataShard;
import org.nustaq.kontraktor.services.datacluster.dynamic.DynDataShard;
import org.nustaq.kontraktor.services.rlserver.SingleProcessRLClusterArgs;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.reallive.api.TableDescription;
import org.nustaq.reallive.server.dynamic.DynClusterDistribution;
import org.nustaq.serialization.FSTConfiguration;

public class ServiceRegistry
extends Actor<ServiceRegistry> {
    public static final String CONFIGUPDATE = "configupdate";
    public static final String SERVICEDUMP = "running";
    public static final String AVAILABLE = "available";
    public static final String TIMEOUT = "timeout";
    protected HashMap<String, List<ServiceDescription>> services;
    protected Map<String, StatusEntry> statusMap;
    protected List<Callback> listeners;
    protected ClusterCfg config;
    public static RegistryArgs options;

    @Local
    public void init(ClusterCfg cfg) {
        this.services = new HashMap();
        this.statusMap = new HashMap<String, StatusEntry>();
        this.listeners = new ArrayList<Callback>();
        this.checkTimeout();
        this.config = cfg == null ? ClusterCfg.read() : cfg;
        this.serviceDumper();
    }

    public void serviceDumper() {
        if (!this.isStopped()) {
            try {
                Log.Info((Object)((Object)this), (String)"------");
                this.services.forEach((k, sd) -> Log.Info((Object)((Object)this), (String)("" + sd)));
                Log.Info((Object)((Object)this), (String)"------");
                this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)SERVICEDUMP, this.services)));
                if (ClusterCfg.isDirty()) {
                    this.config = ClusterCfg.read();
                    this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)CONFIGUPDATE, (Object)this.config)));
                }
            }
            catch (Exception e) {
                Log.Error((Object)((Object)this), (Throwable)e);
            }
            if (options.dumpServices()) {
                this.delayed(10000L, () -> this.serviceDumper());
            }
        }
    }

    public void registerService(ServiceDescription desc) {
        Log.Info((Object)((Object)this), (String)("registering service " + desc));
        List<ServiceDescription> serviceList = this.getServiceList(desc.getName());
        serviceList.add(desc);
        desc.receiveHeartbeat();
        if (serviceList.size() == 1) {
            this.broadcastAvailable(desc);
        }
    }

    public IPromise<Map<String, ServiceDescription>> getServiceMap() {
        HashMap servMap = new HashMap();
        this.services.forEach((name, list) -> {
            if (list.size() > 0) {
                servMap.put(name, (ServiceDescription)list.get(0));
            }
        });
        return ServiceRegistry.resolve(servMap);
    }

    protected ServiceDescription getService(String name) {
        List<ServiceDescription> serviceList = this.getServiceList(name);
        if (serviceList.size() > 0) {
            return serviceList.get(0);
        }
        return null;
    }

    public void subscribe(Callback<Pair<String, ServiceDescription>> cb) {
        this.listeners.add(cb);
    }

    protected void broadcastAvailable(ServiceDescription desc) {
        Pair msg = new Pair((Object)AVAILABLE, (Object)desc);
        this.listeners = this.listeners.stream().filter(cb -> !cb.isTerminated()).collect(Collectors.toList());
        this.listeners.forEach(cb -> {
            try {
                cb.pipe((Object)msg);
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
            }
        });
    }

    protected void broadCastTimeOut(ServiceDescription desc) {
        Pair msg = new Pair((Object)TIMEOUT, (Object)desc);
        for (int i = 0; i < this.listeners.size(); ++i) {
            Callback cb = this.listeners.get(i);
            try {
                cb.pipe((Object)msg);
                continue;
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
                this.listeners.remove(i);
                --i;
            }
        }
    }

    public IPromise<ClusterCfg> getConfig() {
        return ServiceRegistry.resolve((Object)this.config);
    }

    public void receiveHeartbeat(String serviceName, String uniqueKey) {
        this.receiveHeartbeatWithStatus(serviceName, uniqueKey, null);
    }

    public void receiveHeartbeatWithStatus(String serviceName, String uniqueKey, Serializable status) {
        long now = System.currentTimeMillis();
        this.getServiceList(serviceName).forEach(sdesc -> {
            if (sdesc.getUniqueKey().equals(uniqueKey)) {
                sdesc.receiveHeartbeat();
                if (status != null) {
                    this.updateStatus(now, (ServiceDescription)sdesc, uniqueKey + "#" + sdesc.getName(), status);
                }
            }
        });
    }

    protected void removeStatus(String key) {
        this.statusMap.remove(key);
    }

    protected void updateStatus(long now, ServiceDescription td, String key, Serializable status) {
        StatusEntry prevStatusEntry = this.statusMap.get(key);
        StatusEntry newEntry = new StatusEntry(now, status, key, td.getName());
        newEntry.startUpTime = prevStatusEntry != null ? prevStatusEntry.startUpTime : System.currentTimeMillis();
        this.statusMap.put(key, newEntry);
    }

    @Local
    public void checkTimeout() {
        this.services.values().forEach(list -> {
            int prevsiz = list.size();
            for (int i = 0; i < list.size(); ++i) {
                ServiceDescription serviceDescription = (ServiceDescription)list.get(i);
                if (!serviceDescription.hasTimedOut()) continue;
                list.remove(i);
                this.removeStatus(serviceDescription.getUniqueKey() + "#" + serviceDescription.getName());
                --i;
                this.broadCastTimeOut(serviceDescription);
            }
            if (prevsiz != list.size() && list.size() > 0) {
                this.broadcastAvailable((ServiceDescription)list.get(0));
            }
        });
        if (!this.isStopped()) {
            this.delayed(1000L, () -> this.checkTimeout());
        }
    }

    protected List<ServiceDescription> getServiceList(String serviceName) {
        List<ServiceDescription> slist = this.services.get(serviceName);
        if (slist == null) {
            slist = new ArrayList<ServiceDescription>();
            this.services.put(serviceName, slist);
        }
        return slist;
    }

    public static ServiceArgs parseCommandLine(String[] args, String[] concatArgs, ServiceArgs options) {
        JCommander com = new JCommander();
        com.setAcceptUnknownOptions(true);
        com.addObject((Object)options);
        try {
            com.parse(args);
            if (concatArgs != null) {
                ServiceRegistry.parseCommandLine(concatArgs, null, options);
            }
        }
        catch (Exception ex) {
            System.out.println("command line error: '" + ex.getMessage() + "'");
            options.help = true;
        }
        if (options.help) {
            com.usage();
            System.exit(-1);
        }
        return options;
    }

    public static ServiceArgs parseCommandLine(String[] args, ServiceArgs options) {
        JCommander com = new JCommander();
        com.setAcceptUnknownOptions(true);
        com.addObject((Object)options);
        try {
            com.parse(args);
        }
        catch (Exception ex) {
            System.out.println("command line error: '" + ex.getMessage() + "'");
            options.help = true;
        }
        if (options.help) {
            com.usage();
            System.exit(-1);
        }
        return options;
    }

    private IPromise<RestApi> getRest() {
        RestApi restApi = (RestApi)ServiceRegistry.AsActor(RestApi.class, (Scheduler)this.getScheduler());
        restApi.init((ServiceRegistry)this.self());
        return ServiceRegistry.resolve((Object)((Object)restApi));
    }

    public IPromise<DynClusterDistribution> getDynDataDistribution() {
        return ServiceRegistry.resolve(null);
    }

    public IPromise<DynClusterDistribution> getActiveDynDataDistribution() {
        return ServiceRegistry.resolve(null);
    }

    public IPromise balanceDynShards() {
        return ServiceRegistry.resolve(null);
    }

    public IPromise releaseDynShard(String name) {
        return ServiceRegistry.resolve(null);
    }

    public IPromise<List<StatusEntry>> getStati() {
        ArrayList res = new ArrayList();
        this.statusMap.forEach((k, v) -> res.add(v));
        res.sort((a, b) -> a.getName().compareTo(b.getName()));
        return ServiceRegistry.resolve(res);
    }

    public static void main(String[] args) {
        ServiceRegistry.start(args);
    }

    public static ServiceRegistry start(String[] args) {
        options = (RegistryArgs)ServiceRegistry.parseCommandLine(args, null, RegistryArgs.New());
        return ServiceRegistry.start(options);
    }

    public static ServiceRegistry start(RegistryArgs options) {
        return ServiceRegistry.start(options, null, ServiceRegistry.class);
    }

    public static void start(SingleProcessRLClusterArgs options, ClusterCfg cfg) {
        ServiceRegistry.start(options, cfg, ServiceRegistry.class);
    }

    public static ServiceRegistry start(RegistryArgs _options, ClusterCfg cfg, Class<? extends ServiceRegistry> clazz) {
        options = _options;
        if (!_options.isAsyncLog()) {
            Log.SetSynchronous();
        }
        if (_options.getClustercfg() != null) {
            ClusterCfg.pathname = _options.getClustercfg();
        }
        ServiceRegistry serviceRegistry = (ServiceRegistry)Actors.AsActor(clazz);
        serviceRegistry.init(cfg);
        Log.Info(ServiceRegistry.class, (String)("listening on " + _options.getRegistryHost() + " " + _options.getRegistryPort()));
        new TCPNIOPublisher((Actor)serviceRegistry, _options.getRegistryPort()).publish(actor -> Log.Info(null, (String)(actor + " has disconnected")));
        Log.Info(ServiceRegistry.class, (String)("monport on http://" + _options.getMonhost() + ":" + _options.getMonport() + "/mon"));
        Http4K.Build((String)_options.getMonhost(), (int)_options.getMonport()).restAPI("/mon", (Actor)serviceRegistry.getRest().await(), serviceRegistry.getReqAuth(), serviceRegistry.getPrepareResponse()).build();
        serviceRegistry.subscribe((Callback<Pair<String, ServiceDescription>>)(Callback & Serializable)(pair, err) -> Log.Info(((Object)((Object)serviceRegistry)).getClass(), (String)((String)pair.car() + " " + pair.cdr())));
        return serviceRegistry;
    }

    @CallerSideMethod
    public Consumer<HttpServerExchange> getPrepareResponse() {
        return null;
    }

    @CallerSideMethod
    public Function<HeaderMap, IPromise> getReqAuth() {
        return null;
    }

    public static class StatusEntry
    implements Serializable {
        long time;
        long startUpTime;
        Object status;
        String key;
        String name;

        public StatusEntry(long time, Object status, String key, String name) {
            this.time = time;
            this.status = status;
            this.key = key;
            this.name = name;
        }

        public long getStartUpTime() {
            return this.startUpTime;
        }

        public long getTime() {
            return this.time;
        }

        public Object getStatus() {
            return this.status;
        }

        public String getKey() {
            return this.key;
        }

        public String getName() {
            return this.name == null ? "unnamed" : this.name;
        }
    }

    public static class RestApi
    extends Actor<RestApi> {
        private ServiceRegistry reg;
        FSTConfiguration jsonConfiguration;

        public void init(ServiceRegistry reg) {
            this.reg = reg;
            this.jsonConfiguration = FSTConfiguration.createJsonConfiguration((boolean)true, (boolean)false);
            this.jsonConfiguration.registerCrossPlatformClassMappingUseSimpleName(new Class[]{TableDescription.class, ServiceDescription.class, TCPConnectable.class, DataShard.class, SerializerType.class, Coding.class, WebSocketConnectable.class, Class.class, StatusEntry.class, DynDataShard.class});
        }

        public IPromise getBalance() {
            Promise p = new Promise();
            this.reg.balanceDynShards().then((Callback & Serializable)(r, e) -> {
                if (e == null) {
                    p.resolve((Object)"<html>balancing done</html>");
                } else {
                    p.reject(e);
                }
            });
            return p;
        }

        public IPromise getRelease(@FromQuery(value="shard") String shard) {
            if (((Map)this.reg.getServiceMap().await()).get(shard) == null) {
                return RestApi.resolve((Object)("<html>unknown shard '" + shard + "' </html>"));
            }
            this.reg.releaseDynShard(shard);
            return RestApi.resolve((Object)("<html>released " + shard + " </html>"));
        }

        public IPromise get() {
            return RestApi.resolve((Object)"<html>try <a href='/mon/services'>/mon/services</a> or <a href='/mon/stati'>/mon/stati</a> or <a href='/mon/distribution'>/mon/distribution</a> or <a href='/mon/activeDistribution'>/mon/activeDistribution</a> or <a href='/mon/balance'>/mon/balance</a></html>");
        }

        public IPromise getServices() {
            Promise p = new Promise();
            this.reg.getServiceMap().then((Callback & Serializable)(r, e) -> {
                if (r != null) {
                    try {
                        p.resolve((Object)new String(this.jsonConfiguration.asByteArray(r), "UTF-8"));
                    }
                    catch (UnsupportedEncodingException e1) {
                        Log.Error((Object)((Object)this), (Throwable)e1);
                        p.reject((Object)500);
                    }
                } else {
                    p.reject((Object)500);
                }
            });
            return p;
        }

        public IPromise getDistribution() {
            return RestApi.resolve((Object)((DynClusterDistribution)this.reg.getDynDataDistribution().await()).toJsonObj().toString(WriterConfig.PRETTY_PRINT));
        }

        public IPromise getActiveDistribution() {
            return RestApi.resolve((Object)((DynClusterDistribution)this.reg.getActiveDynDataDistribution().await()).toJsonObj().toString(WriterConfig.PRETTY_PRINT));
        }

        public IPromise getStati() {
            Promise p = new Promise();
            this.reg.getStati().then((Callback & Serializable)(r, e) -> {
                if (r != null) {
                    try {
                        p.resolve((Object)new String(this.jsonConfiguration.asByteArray(r), "UTF-8"));
                    }
                    catch (UnsupportedEncodingException e1) {
                        Log.Error((Object)((Object)this), (Throwable)e1);
                        p.reject((Object)500);
                    }
                } else {
                    p.reject((Object)500);
                }
            });
            return p;
        }
    }
}

