package com.github.phantomthief.zookeeper.broadcast;

import com.google.common.base.Throwables;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/phantomthief/zookeeper/broadcast/ZkBroadcaster.class */
public class ZkBroadcaster {
    private static final String DEFAULT_ZK_PREFIX = "/broadcast";
    private final Logger logger;
    private final Supplier<CuratorFramework> curatorFactory;
    private final String zkPrefix;
    private final ConcurrentMap<String, Set<ZkSubscriber>> subscribeMap;
    private final ConcurrentMap<String, NodeCache> nodeCacheMap;

    public ZkBroadcaster(Supplier<CuratorFramework> supplier, String str) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.subscribeMap = new ConcurrentHashMap();
        this.nodeCacheMap = new ConcurrentHashMap();
        this.curatorFactory = supplier;
        this.zkPrefix = str;
    }

    public ZkBroadcaster(Supplier<CuratorFramework> supplier) {
        this(supplier, DEFAULT_ZK_PREFIX);
    }

    public void subscribe(String str, ZkSubscriber zkSubscriber) {
        if (str == null || zkSubscriber == null) {
            throw new NullPointerException();
        }
        Set<ZkSubscriber> compute = this.subscribeMap.compute(str, (str2, set) -> {
            if (set == null) {
                set = new HashSet();
            }
            set.add(zkSubscriber);
            return set;
        });
        this.nodeCacheMap.computeIfAbsent(str, str3 -> {
            NodeCache nodeCache = new NodeCache(this.curatorFactory.get(), ZKPaths.makePath(this.zkPrefix, str3));
            try {
                nodeCache.start();
                nodeCache.rebuild();
                nodeCache.getListenable().addListener(() -> {
                    compute.parallelStream().forEach(zkSubscriber2 -> {
                        try {
                            zkSubscriber2.handle(nodeCache.getCurrentData().getData());
                        } catch (Throwable th) {
                            this.logger.error("Ops. fail to do handle for:{}->{}", new Object[]{this.zkPrefix, zkSubscriber2, th});
                        }
                    });
                });
                return nodeCache;
            } catch (Throwable th) {
                this.logger.error("Ops.", th);
                throw Throwables.propagate(th);
            }
        });
    }

    public void broadcast(String str, String str2) {
        String makePath = ZKPaths.makePath(this.zkPrefix, str);
        try {
            try {
                this.curatorFactory.get().setData().forPath(makePath, str2.getBytes());
            } catch (KeeperException.NoNodeException e) {
                this.curatorFactory.get().create().creatingParentsIfNeeded().forPath(makePath, str2.getBytes());
            }
        } catch (Throwable th) {
            throw Throwables.propagate(th);
        }
    }
}
