/*
 * Decompiled with CFR 0.152.
 */
package de.taimos.dvalin.orchestration.etcd.discovery;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import de.taimos.daemon.DaemonStarter;
import de.taimos.daemon.LifecyclePhase;
import de.taimos.daemon.spring.conditional.OnSystemProperty;
import de.taimos.dvalin.orchestration.core.discovery.ServiceDiscovery;
import de.taimos.dvalin.orchestration.core.discovery.ServiceInstance;
import de.taimos.dvalin.orchestration.core.discovery.ServiceListener;
import de.taimos.dvalin.orchestration.etcd.discovery.HostInfo;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.responses.EtcdAuthenticationException;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
@OnSystemProperty(propertyName="orchestration.etcd.peers")
public class EtcdServiceDiscovery
implements ServiceDiscovery {
    public static final Logger LOGGER = LoggerFactory.getLogger(EtcdServiceDiscovery.class);
    private static final int INSTANCE_TIMEOUT = 16;
    private static final int REFRESH_INTERVAL = 5;
    @Value(value="${orchestration.etcd.peers}")
    private String peers;
    private EtcdClient client;
    private final ScheduledExecutorService updateExecutor = Executors.newScheduledThreadPool(1);
    private final ObjectMapper mapper = new ObjectMapper();
    private Map<String, Object> properties;
    private final Multimap<String, ServiceListener> serviceListeners = ArrayListMultimap.create();
    private final ConcurrentMap<String, Long> etcdIndex = new ConcurrentHashMap<String, Long>();

    @PostConstruct
    public void init() {
        List<URI> uris = Arrays.stream(this.peers.split(",")).map(URI::create).collect(Collectors.toList());
        this.client = new EtcdClient(uris.toArray(new URI[0]));
    }

    @PreDestroy
    public void shutdown() {
        this.serviceListeners.clear();
    }

    public void registerInstance() {
        try {
            ServiceInstance instance = this.createLocalServiceInstance();
            String key = this.getServiceInstanceKey(instance);
            this.client.put(key, this.getHostInfoAsString(instance, null)).ttl(Integer.valueOf(16)).send().get();
            this.updateExecutor.scheduleAtFixedRate(() -> {
                try {
                    this.client.refresh(key, Integer.valueOf(16)).send();
                }
                catch (IOException e) {
                    LOGGER.warn("Error refreshing service state", (Throwable)e);
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOGGER.warn("Error registering instance", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void updateInstance() {
        try {
            ServiceInstance instance = this.createLocalServiceInstance();
            this.client.put(this.getServiceInstanceKey(instance), this.getHostInfoAsString(instance, this.properties)).ttl(Integer.valueOf(16)).prevExist(true).send();
        }
        catch (IOException e) {
            LOGGER.warn("Error updating instance", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void unregisterInstance() {
        try {
            ServiceInstance instance = this.createLocalServiceInstance();
            this.client.delete(this.getServiceInstanceKey(instance)).send();
            this.updateExecutor.shutdown();
        }
        catch (IOException e) {
            LOGGER.warn("Error unregistering instance", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void setAdditionalProperties(Map<String, Object> properties) {
        this.properties = properties;
        this.updateInstance();
    }

    public Optional<Map<String, Object>> getAdditionalProperties() {
        return Optional.ofNullable(this.properties);
    }

    public List<ServiceInstance> getInstancesForService(String serviceName) {
        ArrayList<ServiceInstance> list = new ArrayList<ServiceInstance>();
        try {
            String serviceKey = this.getServiceKey(serviceName);
            EtcdKeysResponse response = (EtcdKeysResponse)this.client.get(serviceKey).timeout(5L, TimeUnit.SECONDS).send().get();
            Pattern keyPattern = Pattern.compile(serviceKey + "/([A-Fa-f0-9\\-]+)");
            response.getNode().getNodes().forEach(node -> {
                Matcher matcher = keyPattern.matcher(node.getKey());
                if (matcher.matches()) {
                    String instanceId = matcher.group(1);
                    HostInfo hostInfo = this.parseHostInfo(node.getValue());
                    list.add(new ServiceInstance(hostInfo.getHost(), serviceName, instanceId, LifecyclePhase.valueOf((String)hostInfo.getStatus())));
                }
            });
        }
        catch (Exception e) {
            LOGGER.warn("Error fetching instance data", (Throwable)e);
        }
        return list;
    }

    public Optional<Map<String, Object>> getAdditionalProperties(ServiceInstance instance) {
        try {
            EtcdKeysResponse response = (EtcdKeysResponse)this.client.get(this.getServiceInstanceKey(instance)).timeout(5L, TimeUnit.SECONDS).send().get();
            String value = response.getNode().getValue();
            if (value != null) {
                return Optional.ofNullable(this.parseHostInfo(value).getProperties());
            }
        }
        catch (Exception e) {
            LOGGER.warn("Error fetching instance data", (Throwable)e);
        }
        return Optional.empty();
    }

    public void addListenerForService(String serviceName, ServiceListener listener) {
        boolean hadListeners = this.serviceListeners.containsKey((Object)serviceName);
        this.serviceListeners.put((Object)serviceName, (Object)listener);
        if (!hadListeners) {
            new Thread(() -> {
                String serviceKey = this.getServiceKey(serviceName);
                Pattern keyPattern = Pattern.compile(serviceKey + "/([A-Fa-f0-9\\-]+)");
                while (this.serviceListeners.containsKey((Object)serviceName)) {
                    LOGGER.debug("Polling for service updates for service {}", (Object)serviceName);
                    try {
                        EtcdResponsePromise send = this.client.get(serviceKey).waitForChange(this.etcdIndex.getOrDefault(serviceName, 1L).longValue()).timeout(10L, TimeUnit.SECONDS).recursive().send();
                        this.parseWaitResponse(serviceName, keyPattern, (EtcdResponsePromise<EtcdKeysResponse>)send);
                    }
                    catch (IOException e) {
                        LOGGER.warn("Error waiting for instance updates", (Throwable)e);
                    }
                }
            }, "etcd-poller-" + serviceName).start();
        }
    }

    private void parseWaitResponse(String serviceName, Pattern keyPattern, EtcdResponsePromise<EtcdKeysResponse> send) throws IOException {
        try {
            EtcdKeysResponse response = (EtcdKeysResponse)send.get();
            this.etcdIndex.put(serviceName, response.node.getModifiedIndex() + 1L);
            Matcher matcher = keyPattern.matcher(response.node.getKey());
            if (matcher.matches()) {
                String instanceId = matcher.group(1);
                switch (response.action) {
                    case set: 
                    case create: 
                    case update: 
                    case compareAndSwap: {
                        HostInfo info = this.parseHostInfo(response.getNode().getValue());
                        ServiceInstance instance = new ServiceInstance(info.getHost(), serviceName, instanceId, LifecyclePhase.valueOf((String)info.getStatus()));
                        if (response.getPrevNode() != null) {
                            this.getListeners(serviceName).forEach(l -> l.instanceChanged(instance));
                            break;
                        }
                        this.getListeners(serviceName).forEach(l -> l.instanceRegistered(instance));
                        break;
                    }
                    case delete: 
                    case expire: 
                    case compareAndDelete: {
                        HostInfo removedInfo = this.parseHostInfo(response.getPrevNode().getValue());
                        ServiceInstance removedInstance = new ServiceInstance(removedInfo.getHost(), serviceName, instanceId, LifecyclePhase.valueOf((String)removedInfo.getStatus()));
                        this.getListeners(serviceName).forEach(l -> l.instanceUnregistered(removedInstance));
                        break;
                    }
                }
            }
        }
        catch (TimeoutException response) {
        }
        catch (EtcdAuthenticationException e) {
            LOGGER.warn("ETCD authentication error", (Throwable)e);
        }
        catch (EtcdException e) {
            if (e.getErrorCode() == 401) {
                LOGGER.info("Skipped events as index was outdated");
                this.etcdIndex.put(serviceName, e.getIndex());
            }
            LOGGER.warn("ETCD error", (Throwable)e);
        }
    }

    public void removeListenerForService(String serviceName, ServiceListener listener) {
        this.serviceListeners.remove((Object)serviceName, (Object)listener);
    }

    private String getServiceInstanceKey(ServiceInstance instance) {
        return this.getServiceKey(instance.getServiceName()) + "/" + instance.getInstanceId();
    }

    private String getServiceKey(String serviceName) {
        return "/dvalin/discovery/" + serviceName;
    }

    private ServiceInstance createLocalServiceInstance() {
        return new ServiceInstance(DaemonStarter.getHostname(), DaemonStarter.getDaemonName(), DaemonStarter.getInstanceId(), DaemonStarter.getCurrentPhase());
    }

    private String getHostInfoAsString(ServiceInstance instance, Map<String, Object> properties) {
        try {
            HostInfo info = new HostInfo();
            info.setHost(instance.getHost());
            info.setStatus(instance.getPhase().name());
            info.setProperties(properties);
            return this.mapper.writeValueAsString((Object)info);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private HostInfo parseHostInfo(String info) {
        try {
            return (HostInfo)this.mapper.readValue(info, HostInfo.class);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Collection<ServiceListener> getListeners(String serviceName) {
        return Lists.newArrayList((Iterable)this.serviceListeners.get((Object)serviceName));
    }
}

