/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.services;

import com.beust.jcommander.JCommander;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.annotations.Local;
import org.nustaq.kontraktor.remoting.tcp.TCPNIOPublisher;
import org.nustaq.kontraktor.services.ClusterCfg;
import org.nustaq.kontraktor.services.ServiceArgs;
import org.nustaq.kontraktor.services.ServiceDescription;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.reallive.api.Record;
import org.nustaq.reallive.messages.AddMessage;
import org.nustaq.reallive.messages.QueryDoneMessage;
import org.nustaq.reallive.messages.RemoveMessage;
import org.nustaq.reallive.messages.UpdateMessage;
import org.nustaq.reallive.records.MapRecord;

public class ServiceRegistry
extends Actor<ServiceRegistry> {
    public static final String CONFIGUPDATE = "configupdate";
    public static final String SERVICEDUMP = "running";
    public static final String AVAILABLE = "available";
    public static final String TIMEOUT = "timeout";
    public static Class[] JSONCLASSES = new Class[]{AddMessage.class, RemoveMessage.class, UpdateMessage.class, QueryDoneMessage.class, Record.class, MapRecord.class};
    HashMap<String, List<ServiceDescription>> services;
    List<Callback> listeners;
    ClusterCfg config;
    public static ServiceArgs options;

    @Local
    public void init() {
        this.services = new HashMap();
        this.listeners = new ArrayList<Callback>();
        this.checkTimeout();
        this.config = ClusterCfg.read();
        this.serviceDumper();
    }

    public void serviceDumper() {
        if (!this.isStopped()) {
            try {
                Log.Info((Object)((Object)this), (String)"------");
                this.services.forEach((k, sd) -> Log.Info((Object)((Object)this), (String)("" + sd)));
                Log.Info((Object)((Object)this), (String)"------");
                this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)SERVICEDUMP, this.services)));
                if (ClusterCfg.isDirty()) {
                    this.config = ClusterCfg.read();
                    this.listeners.forEach(cb -> cb.pipe((Object)new Pair((Object)CONFIGUPDATE, (Object)this.config)));
                }
            }
            catch (Exception e) {
                Log.Error((Object)((Object)this), (Throwable)e);
            }
            this.delayed(10000L, () -> this.serviceDumper());
        }
    }

    public void registerService(ServiceDescription desc) {
        List<ServiceDescription> serviceList = this.getServiceList(desc.getName());
        serviceList.add(desc);
        desc.receiveHeartbeat();
        if (serviceList.size() == 1) {
            this.broadcastAvailable(desc);
        }
    }

    public IPromise<Map<String, ServiceDescription>> getServiceMap() {
        HashMap servMap = new HashMap();
        this.services.forEach((name, list) -> {
            if (list.size() > 0) {
                servMap.put(name, list.get(0));
            }
        });
        return ServiceRegistry.resolve(servMap);
    }

    public void subscribe(Callback<Pair<String, ServiceDescription>> cb) {
        this.listeners.add(cb);
    }

    protected void broadcastAvailable(ServiceDescription desc) {
        Pair msg = new Pair((Object)AVAILABLE, (Object)desc);
        this.listeners = this.listeners.stream().filter(cb -> !cb.isTerminated()).collect(Collectors.toList());
        this.listeners.forEach(cb -> {
            try {
                cb.pipe((Object)msg);
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
            }
        });
    }

    protected void broadCastTimeOut(ServiceDescription desc) {
        Pair msg = new Pair((Object)TIMEOUT, (Object)desc);
        for (int i = 0; i < this.listeners.size(); ++i) {
            Callback cb = this.listeners.get(i);
            try {
                cb.pipe((Object)msg);
                continue;
            }
            catch (Throwable th) {
                Log.Info((Object)((Object)this), (Throwable)th);
                this.listeners.remove(i);
                --i;
            }
        }
    }

    public IPromise<ClusterCfg> getConfig() {
        return ServiceRegistry.resolve((Object)this.config);
    }

    public void receiveHeartbeat(String serviceName, String uniqueKey) {
        this.getServiceList(serviceName).forEach(sdesc -> {
            if (sdesc.getUniqueKey().equals(uniqueKey)) {
                sdesc.receiveHeartbeat();
            }
        });
    }

    @Local
    public void checkTimeout() {
        this.services.values().forEach(list -> {
            int prevsiz = list.size();
            for (int i = 0; i < list.size(); ++i) {
                ServiceDescription serviceDescription = (ServiceDescription)list.get(i);
                if (!serviceDescription.hasTimedOut()) continue;
                list.remove(i);
                --i;
                this.broadCastTimeOut(serviceDescription);
            }
            if (prevsiz != list.size() && list.size() > 0) {
                this.broadcastAvailable((ServiceDescription)list.get(0));
            }
        });
        if (!this.isStopped()) {
            this.delayed(1000L, () -> this.checkTimeout());
        }
    }

    protected List<ServiceDescription> getServiceList(String serviceName) {
        List<ServiceDescription> slist = this.services.get(serviceName);
        if (slist == null) {
            slist = new ArrayList<ServiceDescription>();
            this.services.put(serviceName, slist);
        }
        return slist;
    }

    public static ServiceArgs parseCommandLine(String[] args, ServiceArgs options) {
        JCommander com = new JCommander();
        com.addObject((Object)options);
        try {
            com.parse(args);
        }
        catch (Exception ex) {
            System.out.println("command line error: '" + ex.getMessage() + "'");
            options.help = true;
        }
        if (options.help) {
            com.usage();
            System.exit(-1);
        }
        return options;
    }

    public static void main(String[] args) {
        options = ServiceRegistry.parseCommandLine(args, new ServiceArgs());
        if (!options.isSysoutlog()) {
            Log.SetSynchronous();
        }
        ServiceRegistry serviceRegistry = (ServiceRegistry)Actors.AsActor(ServiceRegistry.class);
        serviceRegistry.init();
        new TCPNIOPublisher((Actor)serviceRegistry, options.getRegistryPort()).publish(actor -> Log.Info(null, (String)(actor + " has disconnected")));
        serviceRegistry.subscribe((Callback<Pair<String, ServiceDescription>>)(Callback & Serializable)(pair, err) -> Log.Info(((Object)((Object)serviceRegistry)).getClass(), (String)((String)pair.car() + " " + pair.cdr())));
    }
}

