package org.apache.kylin.rest.discovery;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.kylin.common.util.ClusterConstant;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.rest.discovery.KylinServiceDiscovery;
import org.apache.kylin.rest.response.ServerInfoResponse;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.loader.util.SystemPropertyUtils;
import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
import org.springframework.cloud.zookeeper.discovery.ZookeeperInstance;
import org.springframework.stereotype.Component;

@ConditionalOnZookeeperEnabled
@Component
/* loaded from: input_file:BOOT-INF/classes/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.class */
public class KylinServiceDiscoveryCache implements KylinServiceDiscovery {
    private static final Logger logger = LoggerFactory.getLogger(KylinServiceDiscoveryCache.class);
    private static final KylinServiceDiscovery.Callback UPDATE_ALL_EPOCHS = () -> {
        try {
            EpochManager.getInstance().updateAllEpochs();
        } catch (Exception e) {
            logger.error("UpdateAllEpochs failed", e);
        }
    };

    @Autowired
    private KylinServiceDiscoveryClient kylinServiceDiscoveryClient;

    @Autowired
    private ServiceDiscovery<ZookeeperInstance> serviceDiscovery;

    @Autowired
    private CuratorFramework curatorClient;
    private final List<ClusterConstant.ServerModeEnum> ALL_CHECK_MODE_LIST = ImmutableList.of(ClusterConstant.ServerModeEnum.ALL, ClusterConstant.ServerModeEnum.JOB, ClusterConstant.ServerModeEnum.QUERY);
    private final Map<ClusterConstant.ServerModeEnum, ServiceCache<ZookeeperInstance>> serverModeCacheMap = Maps.newHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache$2, reason: invalid class name */
    /* loaded from: input_file:BOOT-INF/classes/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kylin$common$util$ClusterConstant$ServerModeEnum = new int[ClusterConstant.ServerModeEnum.values().length];

        static {
            try {
                $SwitchMap$org$apache$kylin$common$util$ClusterConstant$ServerModeEnum[ClusterConstant.ServerModeEnum.QUERY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kylin$common$util$ClusterConstant$ServerModeEnum[ClusterConstant.ServerModeEnum.JOB.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kylin$common$util$ClusterConstant$ServerModeEnum[ClusterConstant.ServerModeEnum.ALL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    private static String instance2ServerStr(@Nonnull ServiceInstance<ZookeeperInstance> serviceInstance) {
        Preconditions.checkNotNull(serviceInstance, "service instance is null");
        return serviceInstance.getAddress() + SystemPropertyUtils.VALUE_SEPARATOR + serviceInstance.getPort();
    }

    @PostConstruct
    private void init() throws Exception {
        registerServiceCache();
        startServiceCache();
    }

    @PreDestroy
    private void close() {
        Iterator<ServiceCache<ZookeeperInstance>> it = this.serverModeCacheMap.values().iterator();
        while (it.hasNext()) {
            IOUtils.closeQuietly(it.next());
        }
        this.serverModeCacheMap.clear();
    }

    private void registerServiceCache() throws Exception {
        Iterator<ClusterConstant.ServerModeEnum> it = this.ALL_CHECK_MODE_LIST.iterator();
        while (it.hasNext()) {
            registerServiceCacheByMode(it.next());
        }
    }

    private void registerServiceCacheByMode(ClusterConstant.ServerModeEnum serverModeEnum) throws Exception {
        switch (AnonymousClass2.$SwitchMap$org$apache$kylin$common$util$ClusterConstant$ServerModeEnum[serverModeEnum.ordinal()]) {
            case 1:
                this.serverModeCacheMap.put(ClusterConstant.ServerModeEnum.QUERY, createServiceCache(this.serviceDiscovery, ClusterConstant.ServerModeEnum.QUERY, () -> {
                }));
                return;
            case 2:
                this.serverModeCacheMap.put(ClusterConstant.ServerModeEnum.JOB, createServiceCache(this.serviceDiscovery, ClusterConstant.ServerModeEnum.JOB, UPDATE_ALL_EPOCHS));
                return;
            case 3:
                this.serverModeCacheMap.put(ClusterConstant.ServerModeEnum.ALL, createServiceCache(this.serviceDiscovery, ClusterConstant.ServerModeEnum.ALL, UPDATE_ALL_EPOCHS));
                return;
            default:
                return;
        }
    }

    private void startServiceCache() throws Exception {
        Iterator<ServiceCache<ZookeeperInstance>> it = this.serverModeCacheMap.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    private ServiceCache<ZookeeperInstance> createServiceCache(ServiceDiscovery<ZookeeperInstance> serviceDiscovery, ClusterConstant.ServerModeEnum serverModeEnum, KylinServiceDiscovery.Callback callback) throws Exception {
        createZkNodeIfNeeded(getZkPathByModeEnum(serverModeEnum));
        ServiceCache<ZookeeperInstance> build = serviceDiscovery.serviceCacheBuilder().name(serverModeEnum.getName()).threadFactory(Executors.defaultThreadFactory()).build();
        build.addListener(getListener(serverModeEnum, callback));
        return build;
    }

    private ServiceCacheListener getListener(final ClusterConstant.ServerModeEnum serverModeEnum, final KylinServiceDiscovery.Callback callback) {
        return new ServiceCacheListener() { // from class: org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache.1
            public void cacheChanged() {
                List serverStrByServerMode = KylinServiceDiscoveryCache.this.getServerStrByServerMode(serverModeEnum);
                Unsafe.setProperty("kylin.server.cluster-mode-" + serverModeEnum.getName(), StringUtils.join(serverStrByServerMode, ","));
                KylinServiceDiscoveryCache.logger.info("kylin.server.cluster-mode-{} update to {}", serverModeEnum.getName(), serverStrByServerMode);
                if (KylinServiceDiscoveryCache.this.getServerInfoByServerMode(ClusterConstant.ServerModeEnum.ALL, ClusterConstant.ServerModeEnum.JOB).stream().map((v0) -> {
                    return v0.getHost();
                }).anyMatch(str -> {
                    return Objects.equals(str, KylinServiceDiscoveryCache.this.kylinServiceDiscoveryClient.getLocalServiceServer());
                })) {
                    KylinServiceDiscoveryCache.logger.debug("Current node is active node, try to update all epochs");
                    callback.action();
                }
            }

            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                KylinServiceDiscoveryCache.logger.info("zookeeper connection state changed to {}", connectionState);
            }
        };
    }

    private ServiceCache<ZookeeperInstance> getServiceCacheByMode(@Nonnull ClusterConstant.ServerModeEnum serverModeEnum) {
        Preconditions.checkNotNull(serverModeEnum, "server mode is null");
        ServiceCache<ZookeeperInstance> serviceCache = this.serverModeCacheMap.get(serverModeEnum);
        Preconditions.checkNotNull(serviceCache, "cannot find the server cache :" + serverModeEnum.getName());
        return serviceCache;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<String> getServerStrByServerMode(@Nonnull ClusterConstant.ServerModeEnum serverModeEnum) {
        Preconditions.checkNotNull(serverModeEnum, "server mode is null!");
        return (List) getServiceCacheByMode(serverModeEnum).getInstances().stream().map(KylinServiceDiscoveryCache::instance2ServerStr).collect(Collectors.toList());
    }

    @Override // org.apache.kylin.rest.discovery.KylinServiceDiscovery
    public List<ServerInfoResponse> getServerInfoByServerMode(@Nullable ClusterConstant.ServerModeEnum... serverModeEnumArr) {
        ArrayList newArrayList = Lists.newArrayList();
        if (ArrayUtils.isEmpty(serverModeEnumArr)) {
            return newArrayList;
        }
        for (ClusterConstant.ServerModeEnum serverModeEnum : serverModeEnumArr) {
            newArrayList.addAll((Collection) getServiceCacheByMode(serverModeEnum).getInstances().stream().map(serviceInstance -> {
                return new ServerInfoResponse(instance2ServerStr(serviceInstance), serverModeEnum.getName());
            }).collect(Collectors.toList()));
        }
        return newArrayList;
    }

    private void createZkNodeIfNeeded(String str) throws Exception {
        try {
            if (this.curatorClient.checkExists().forPath(str) != null) {
                logger.warn("The znode {} is existed", str);
            } else {
                ((ACLBackgroundPathAndBytesable) this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(str);
                logger.info("create znode {} success", str);
            }
        } catch (Exception e) {
            logger.error("Fail to check or create znode for {}", str, e);
            throw e;
        } catch (KeeperException.NodeExistsException e2) {
            logger.warn("The znode {} has been created by others", str);
        }
    }
}
