package org.apache.druid.curator.discovery;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.discovery.BaseNodeRoleWatcher;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.utils.CloseableUtils;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.class */
public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider {
    private static final Logger log = new Logger(CuratorDruidNodeDiscoveryProvider.class);
    private final CuratorFramework curatorFramework;
    private final ZkPathsConfig config;
    private final ObjectMapper jsonMapper;
    private ExecutorService listenerExecutor;
    private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers = new ConcurrentHashMap<>();
    private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new ConcurrentLinkedQueue<>();
    private final LifecycleLock lifecycleLock = new LifecycleLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider$NodeDiscoverer.class */
    public static class NodeDiscoverer implements Closeable {
        private final ObjectMapper jsonMapper;
        private final NodeCache nodeCache;
        private final NodeRole nodeRole;

        private NodeDiscoverer(ZkPathsConfig zkPathsConfig, ObjectMapper objectMapper, CuratorFramework curatorFramework, DruidNode druidNode, NodeRole nodeRole) {
            this.jsonMapper = objectMapper;
            this.nodeCache = new NodeCache(curatorFramework, CuratorDruidNodeAnnouncer.makeNodeAnnouncementPath(zkPathsConfig, nodeRole, druidNode), true);
            this.nodeRole = nodeRole;
            try {
                this.nodeCache.start(true);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean nodeDiscovered() {
            ChildData currentData = this.nodeCache.getCurrentData();
            if (currentData == null) {
                return false;
            }
            try {
                DiscoveryDruidNode discoveryDruidNode = (DiscoveryDruidNode) this.jsonMapper.readValue(currentData.getData(), DiscoveryDruidNode.class);
                if (this.nodeRole.equals(discoveryDruidNode.getNodeRole())) {
                    CuratorDruidNodeDiscoveryProvider.log.info("Node[%s] of role[%s] appeared.", discoveryDruidNode.getDruidNode().getUriToUse(), discoveryDruidNode.getNodeRole().getJsonName());
                    return true;
                }
                CuratorDruidNodeDiscoveryProvider.log.error("Node[%s] of role[%s] add is discovered by node watcher of different node role. Ignored.", discoveryDruidNode.getDruidNode().getUriToUse(), discoveryDruidNode.getNodeRole().getJsonName());
                return false;
            } catch (IOException e) {
                CuratorDruidNodeDiscoveryProvider.log.error(e, "Exception occurred when reading node's value", new Object[0]);
                return false;
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.nodeCache.close();
        }
    }

    /* loaded from: input_file:org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider$NodeRoleWatcher.class */
    private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable {
        private static final Logger log = new Logger(NodeRoleWatcher.class);
        private final CuratorFramework curatorFramework;
        private final NodeRole nodeRole;
        private final ObjectMapper jsonMapper;
        private final BaseNodeRoleWatcher baseNodeRoleWatcher;
        private final PathChildrenCache cache;
        private final ExecutorService cacheExecutor;
        private final Object lock = new Object();

        NodeRoleWatcher(ExecutorService executorService, CuratorFramework curatorFramework, String str, ObjectMapper objectMapper, NodeRole nodeRole) {
            this.curatorFramework = curatorFramework;
            this.nodeRole = nodeRole;
            this.jsonMapper = objectMapper;
            this.baseNodeRoleWatcher = new BaseNodeRoleWatcher(executorService, nodeRole);
            this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", StringUtils.encodeForFormat(nodeRole.toString())));
            this.cache = new PathChildrenCacheFactory.Builder().withCacheData(true).withCompressed(true).withExecutorService(this.cacheExecutor).build().make(curatorFramework, ZKPaths.makePath(str, nodeRole.toString()));
            try {
                this.cache.getListenable().addListener((curatorFramework2, pathChildrenCacheEvent) -> {
                    handleChildEvent(pathChildrenCacheEvent);
                });
                this.cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            PathChildrenCache pathChildrenCache = this.cache;
            ExecutorService executorService = this.cacheExecutor;
            executorService.getClass();
            CloseableUtils.closeAll(pathChildrenCache, executorService::shutdownNow);
        }

        @Override // org.apache.druid.discovery.DruidNodeDiscovery
        public Collection<DiscoveryDruidNode> getAllNodes() {
            return this.baseNodeRoleWatcher.getAllNodes();
        }

        @Override // org.apache.druid.discovery.DruidNodeDiscovery
        public void registerListener(DruidNodeDiscovery.Listener listener) {
            this.baseNodeRoleWatcher.registerListener(listener);
        }

        void handleChildEvent(PathChildrenCacheEvent pathChildrenCacheEvent) {
            synchronized (this.lock) {
                try {
                    switch (pathChildrenCacheEvent.getType()) {
                        case CHILD_ADDED:
                            childAdded(pathChildrenCacheEvent);
                            break;
                        case CHILD_REMOVED:
                            childRemoved(pathChildrenCacheEvent);
                            break;
                        case INITIALIZED:
                            this.baseNodeRoleWatcher.cacheInitialized();
                            break;
                        default:
                            log.warn("Ignored event type [%s] for node watcher of role [%s].", pathChildrenCacheEvent.getType(), this.nodeRole.getJsonName());
                            break;
                    }
                } catch (Exception e) {
                    log.error(e, "Unknown error in node watcher of role [%s].", this.nodeRole.getJsonName());
                }
            }
        }

        @GuardedBy("lock")
        private void childAdded(PathChildrenCacheEvent pathChildrenCacheEvent) throws IOException {
            byte[] zkDataForNode = getZkDataForNode(pathChildrenCacheEvent.getData());
            if (zkDataForNode == null) {
                log.error("Failed to get data for path [%s]. Ignoring a child addition event.", pathChildrenCacheEvent.getData().getPath());
            } else {
                this.baseNodeRoleWatcher.childAdded((DiscoveryDruidNode) this.jsonMapper.readValue(zkDataForNode, DiscoveryDruidNode.class));
            }
        }

        @GuardedBy("lock")
        private void childRemoved(PathChildrenCacheEvent pathChildrenCacheEvent) throws IOException {
            byte[] data = pathChildrenCacheEvent.getData().getData();
            if (data == null) {
                log.error("Failed to get data for path [%s]. Ignoring a child removal event.", pathChildrenCacheEvent.getData().getPath());
            } else {
                this.baseNodeRoleWatcher.childRemoved((DiscoveryDruidNode) this.jsonMapper.readValue(data, DiscoveryDruidNode.class));
            }
        }

        @Nullable
        private byte[] getZkDataForNode(ChildData childData) {
            try {
                return this.curatorFramework.getData().decompressed().forPath(childData.getPath());
            } catch (Exception e) {
                log.error(e, "Exception while getting data for node %s", childData.getPath());
                return null;
            }
        }
    }

    @Inject
    public CuratorDruidNodeDiscoveryProvider(CuratorFramework curatorFramework, ZkPathsConfig zkPathsConfig, @Json ObjectMapper objectMapper) {
        this.curatorFramework = curatorFramework;
        this.config = zkPathsConfig;
        this.jsonMapper = objectMapper;
    }

    @Override // org.apache.druid.discovery.DruidNodeDiscoveryProvider
    public BooleanSupplier getForNode(DruidNode druidNode, NodeRole nodeRole) {
        Preconditions.checkState(this.lifecycleLock.isStarted());
        log.debug("Creating a NodeDiscoverer for node [%s] and role [%s]", druidNode, nodeRole);
        NodeDiscoverer nodeDiscoverer = new NodeDiscoverer(this.config, this.jsonMapper, this.curatorFramework, druidNode, nodeRole);
        this.nodeDiscoverers.add(nodeDiscoverer);
        nodeDiscoverer.getClass();
        return () -> {
            return nodeDiscoverer.nodeDiscovered();
        };
    }

    @Override // org.apache.druid.discovery.DruidNodeDiscoveryProvider
    public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole) {
        Preconditions.checkState(this.lifecycleLock.isStarted());
        return this.nodeRoleWatchers.computeIfAbsent(nodeRole, nodeRole2 -> {
            log.debug("Creating NodeRoleWatcher for nodeRole [%s].", nodeRole2);
            NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(this.listenerExecutor, this.curatorFramework, this.config.getInternalDiscoveryPath(), this.jsonMapper, nodeRole2);
            log.debug("Created NodeRoleWatcher for nodeRole [%s].", nodeRole2);
            return nodeRoleWatcher;
        });
    }

    @LifecycleStart
    public void start() {
        if (!this.lifecycleLock.canStart()) {
            throw new ISE("can't start.", new Object[0]);
        }
        try {
            this.listenerExecutor = Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
            log.debug("Started.", new Object[0]);
            this.lifecycleLock.started();
        } finally {
            this.lifecycleLock.exitStart();
        }
    }

    @LifecycleStop
    public void stop() throws IOException {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        log.debug("Stopping.", new Object[0]);
        Closer create = Closer.create();
        create.registerAll(this.nodeRoleWatchers.values());
        create.registerAll(this.nodeDiscoverers);
        ExecutorService executorService = this.listenerExecutor;
        executorService.getClass();
        CloseableUtils.closeAll(create, executorService::shutdownNow);
    }
}
