package org.apache.kylin.common.zookeeper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.databind.JavaType;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kylin.tool.shaded.org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SystemPropertyUtils;

/* loaded from: input_file:org/apache/kylin/common/zookeeper/KylinServerDiscovery.class */
public class KylinServerDiscovery implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KylinServerDiscovery.class);
    public static final String SERVICE_PATH = "/service";
    public static final String SERVICE_NAME = "cluster_servers";
    public static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
    private final KylinConfig kylinConfig;
    private final CuratorFramework curator;
    private final ServiceDiscovery<LinkedHashMap> serviceDiscovery;
    private final ServiceCache<LinkedHashMap> serviceCache;

    /* loaded from: input_file:org/apache/kylin/common/zookeeper/KylinServerDiscovery$JsonInstanceSerializer.class */
    static class JsonInstanceSerializer<T> implements InstanceSerializer<T> {
        private final ObjectMapper mapper = new ObjectMapper();
        private final Class<T> payloadClass;
        private final JavaType type;

        JsonInstanceSerializer(Class<T> cls) {
            this.payloadClass = cls;
            this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            this.type = this.mapper.getTypeFactory().constructType(ServiceInstance.class);
        }

        public ServiceInstance<T> deserialize(byte[] bArr) throws Exception {
            ServiceInstance<T> serviceInstance = (ServiceInstance) this.mapper.readValue(bArr, this.type);
            this.payloadClass.cast(serviceInstance.getPayload());
            return serviceInstance;
        }

        public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.mapper.convertValue(serviceInstance.getPayload(), this.payloadClass);
            this.mapper.writeValue(byteArrayOutputStream, serviceInstance);
            return byteArrayOutputStream.toByteArray();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/common/zookeeper/KylinServerDiscovery$SingletonHolder.class */
    public static class SingletonHolder {
        private static final KylinServerDiscovery INSTANCE = new KylinServerDiscovery();

        private SingletonHolder() {
        }
    }

    public static KylinServerDiscovery getInstance() {
        return SingletonHolder.INSTANCE;
    }

    private KylinServerDiscovery() {
        this(KylinConfig.getInstanceFromEnv());
    }

    @VisibleForTesting
    protected KylinServerDiscovery(KylinConfig kylinConfig) {
        this.kylinConfig = kylinConfig;
        this.curator = ZKUtil.getZookeeperClient(kylinConfig);
        try {
            this.serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(this.curator).basePath(SERVICE_PATH).serializer(new JsonInstanceSerializer(LinkedHashMap.class)).build();
            this.serviceDiscovery.start();
            this.serviceCache = this.serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME).threadFactory(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("KylinServerTracker-%d").build()).build();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.serviceCache.addListener(new ServiceCacheListener() { // from class: org.apache.kylin.common.zookeeper.KylinServerDiscovery.1
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }

                public void cacheChanged() {
                    KylinServerDiscovery.logger.info("Service discovery get cacheChanged notification");
                    List<ServiceInstance> instances = KylinServerDiscovery.this.serviceCache.getInstances();
                    HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(instances.size());
                    for (ServiceInstance serviceInstance : instances) {
                        newHashMapWithExpectedSize.put(serviceInstance.getAddress() + SystemPropertyUtils.VALUE_SEPARATOR + serviceInstance.getPort(), (String) ((LinkedHashMap) serviceInstance.getPayload()).get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION));
                    }
                    KylinServerDiscovery.logger.info("kylin.server.cluster-servers update to " + newHashMapWithExpectedSize);
                    System.setProperty("kylin.server.cluster-servers", StringUtil.join(newHashMapWithExpectedSize.keySet(), ","));
                    String join = StringUtil.join((Iterable) newHashMapWithExpectedSize.entrySet().stream().map(entry -> {
                        return ((String) entry.getKey()) + SystemPropertyUtils.VALUE_SEPARATOR + ((String) entry.getValue());
                    }).collect(Collectors.toList()), ",");
                    KylinServerDiscovery.logger.info("kylin.server.cluster-servers-with-mode update to " + join);
                    System.setProperty("kylin.server.cluster-servers-with-mode", join);
                    atomicBoolean.set(true);
                }
            });
            this.serviceCache.start();
            registerSelf();
            int i = 1;
            while (!atomicBoolean.get()) {
                logger.info("Haven't registered, waiting ...");
                long j = 100 * i * i;
                if (j > 60000) {
                    j = 60000;
                } else {
                    i++;
                }
                Thread.sleep(j);
            }
        } catch (Exception e) {
            throw new RuntimeException("Fail to initialize due to ", e);
        }
    }

    private void registerSelf() throws Exception {
        String serverRestAddress = this.kylinConfig.getServerRestAddress();
        String[] split = serverRestAddress.split(SystemPropertyUtils.VALUE_SEPARATOR);
        if (split.length < 2) {
            logger.error("kylin.server.host-address {} is not qualified ", serverRestAddress);
            throw new RuntimeException("kylin.server.host-address " + serverRestAddress + " is not qualified");
        }
        registerServer(split[0], Integer.parseInt(split[1]), this.kylinConfig.getServerMode());
    }

    private void registerServer(String str, int i, String str2) throws Exception {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(SERVICE_PAYLOAD_DESCRIPTION, str2);
        ServiceInstance build = ServiceInstance.builder().name(SERVICE_NAME).payload(linkedHashMap).port(i).address(str).build();
        for (ServiceInstance serviceInstance : this.serviceCache.getInstances()) {
            if (serviceInstance.getAddress().equals(build.getAddress()) && serviceInstance.getPort().equals(build.getPort())) {
                this.serviceDiscovery.unregisterService(serviceInstance);
            }
        }
        this.serviceDiscovery.registerService(build);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly((Closeable) this.serviceCache);
        IOUtils.closeQuietly((Closeable) this.serviceDiscovery);
    }
}
