package org.apache.storm.zookeeper;

import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.shade.com.google.common.base.Joiner;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.shade.org.apache.commons.io.IOUtils;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.storm.shade.org.apache.curator.framework.api.CuratorEvent;
import org.apache.storm.shade.org.apache.curator.framework.api.CuratorEventType;
import org.apache.storm.shade.org.apache.curator.framework.api.CuratorListener;
import org.apache.storm.shade.org.apache.curator.framework.api.Pathable;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.Participant;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
import org.apache.storm.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.shade.org.apache.zookeeper.data.Stat;
import org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.storm.shade.org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/zookeeper/Zookeeper.class */
public class Zookeeper {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) Zookeeper.class);
    private static final Zookeeper INSTANCE = new Zookeeper();
    private static Zookeeper _instance = INSTANCE;

    public static void setInstance(Zookeeper zookeeper) {
        _instance = zookeeper;
    }

    public static void resetInstance() {
        _instance = INSTANCE;
    }

    public CuratorFramework mkClientImpl(Map map, List<String> list, Object obj, String str, List<ACL> list2) {
        return mkClientImpl(map, list, obj, str, new DefaultWatcherCallBack(), null, list2);
    }

    public CuratorFramework mkClientImpl(Map map, List<String> list, Object obj, Map map2, List<ACL> list2) {
        return mkClientImpl(map, list, obj, "", new DefaultWatcherCallBack(), map2, list2);
    }

    public CuratorFramework mkClientImpl(Map map, List<String> list, Object obj, String str, Map map2, List<ACL> list2) {
        return mkClientImpl(map, list, obj, str, new DefaultWatcherCallBack(), map2, list2);
    }

    public static CuratorFramework mkClient(Map map, List<String> list, Object obj, String str, WatcherCallBack watcherCallBack, Map map2, List<ACL> list2) {
        return _instance.mkClientImpl(map, list, obj, str, watcherCallBack, map2, list2);
    }

    public CuratorFramework mkClientImpl(Map map, List<String> list, Object obj, String str, final WatcherCallBack watcherCallBack, Map map2, List<ACL> list2) {
        CuratorFramework newCurator = map2 != null ? Utils.newCurator(map, list, obj, str, new ZookeeperAuthInfo(map2), list2) : Utils.newCurator(map, list, obj, str, list2);
        newCurator.getCuratorListenable().addListener(new CuratorListener() { // from class: org.apache.storm.zookeeper.Zookeeper.1
            @Override // org.apache.storm.shade.org.apache.curator.framework.api.CuratorListener
            public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                if (curatorEvent.getType().equals(CuratorEventType.WATCHED)) {
                    WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
                    watcherCallBack.execute(watchedEvent.getState(), watchedEvent.getType(), watchedEvent.getPath());
                }
            }
        });
        LOG.info("Staring ZK Curator");
        newCurator.start();
        return newCurator;
    }

    public CuratorFramework mkClientImpl(Map map, List<String> list, Object obj, String str, WatcherCallBack watcherCallBack, List<ACL> list2) {
        return mkClientImpl(map, list, obj, str, watcherCallBack, null, list2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static String createNode(CuratorFramework curatorFramework, String str, byte[] bArr, CreateMode createMode, List<ACL> list) {
        try {
            return (String) ((ACLBackgroundPathAndBytesable) curatorFramework.create().creatingParentsIfNeeded().withMode(createMode)).withACL(list).forPath(normalizePath(str), bArr);
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static String createNode(CuratorFramework curatorFramework, String str, byte[] bArr, List<ACL> list) {
        return createNode(curatorFramework, str, bArr, CreateMode.PERSISTENT, list);
    }

    public static boolean existsNode(CuratorFramework curatorFramework, String str, boolean z) {
        try {
            return (z ? curatorFramework.checkExists().watched().forPath(normalizePath(str)) : curatorFramework.checkExists().forPath(normalizePath(str))) != null;
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static void deleteNode(CuratorFramework curatorFramework, String str) {
        try {
            if (existsNode(curatorFramework, normalizePath(str), false)) {
                curatorFramework.delete().deletingChildrenIfNeeded().forPath(normalizePath(str));
            }
        } catch (Exception e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
                throw Utils.wrapInRuntime(e);
            }
            LOG.info("delete {} failed.", str, e);
        }
    }

    public static void mkdirs(CuratorFramework curatorFramework, String str, List<ACL> list) {
        _instance.mkdirsImpl(curatorFramework, str, list);
    }

    public void mkdirsImpl(CuratorFramework curatorFramework, String str, List<ACL> list) {
        String normalizePath = normalizePath(str);
        if (normalizePath.equals("/") || existsNode(curatorFramework, normalizePath, false)) {
            return;
        }
        try {
            createNode(curatorFramework, normalizePath, new byte[]{7}, CreateMode.PERSISTENT, list);
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
            }
        }
    }

    public static void syncPath(CuratorFramework curatorFramework, String str) {
        try {
            curatorFramework.sync().forPath(normalizePath(str));
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static void addListener(CuratorFramework curatorFramework, ConnectionStateListener connectionStateListener) {
        curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
    }

    public static byte[] getData(CuratorFramework curatorFramework, String str, boolean z) {
        try {
            String normalizePath = normalizePath(str);
            if (existsNode(curatorFramework, normalizePath, z)) {
                return z ? curatorFramework.getData().watched().forPath(normalizePath) : curatorFramework.getData().forPath(normalizePath);
            }
            return null;
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                return null;
            }
            throw Utils.wrapInRuntime(e);
        }
    }

    public static Integer getVersion(CuratorFramework curatorFramework, String str, boolean z) throws Exception {
        String normalizePath = normalizePath(str);
        Stat stat = null;
        if (existsNode(curatorFramework, normalizePath, z)) {
            stat = z ? curatorFramework.checkExists().watched().forPath(normalizePath) : curatorFramework.checkExists().forPath(normalizePath);
        }
        if (stat == null) {
            return null;
        }
        return Integer.valueOf(stat.getVersion());
    }

    public static List<String> getChildren(CuratorFramework curatorFramework, String str, boolean z) {
        try {
            String normalizePath = normalizePath(str);
            return z ? curatorFramework.getChildren().watched().forPath(normalizePath) : curatorFramework.getChildren().forPath(normalizePath);
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static void deleteNodeBlobstore(CuratorFramework curatorFramework, String str, String str2) {
        String normalizePath = normalizePath(str);
        if (existsNode(curatorFramework, normalizePath, false)) {
            for (String str3 : getChildren(curatorFramework, normalizePath, false)) {
                if (str3.startsWith(str2)) {
                    LOG.debug("deleteNode child {}", str3);
                    deleteNode(curatorFramework, normalizePath + "/" + str3);
                }
            }
        }
    }

    public static Stat setData(CuratorFramework curatorFramework, String str, byte[] bArr) {
        try {
            return curatorFramework.setData().forPath(normalizePath(str), bArr);
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static boolean exists(CuratorFramework curatorFramework, String str, boolean z) {
        return existsNode(curatorFramework, str, z);
    }

    public static List mkInprocessZookeeper(String str, Integer num) throws Exception {
        File file = new File(str);
        ZooKeeperServer zooKeeperServer = new ZooKeeperServer(file, file, 2000);
        int i = 2000;
        int i2 = 65535;
        if (num != null) {
            i = num.intValue();
            i2 = num.intValue();
        }
        do {
            try {
                NIOServerCnxnFactory nIOServerCnxnFactory = new NIOServerCnxnFactory();
                nIOServerCnxnFactory.configure(new InetSocketAddress(i), 0);
                LOG.info("Starting inprocess zookeeper at port {} and dir {}", Integer.valueOf(i), str);
                nIOServerCnxnFactory.startup(zooKeeperServer);
                return Arrays.asList(new Long(i), nIOServerCnxnFactory);
            } catch (BindException e) {
                i++;
            }
        } while (i <= i2);
        throw new RuntimeException("No port is available to launch an inprocess zookeeper");
    }

    public static void shutdownInprocessZookeeper(NIOServerCnxnFactory nIOServerCnxnFactory) {
        nIOServerCnxnFactory.shutdown();
    }

    public static NimbusInfo toNimbusInfo(Participant participant) {
        String id = participant.getId();
        if (StringUtils.isBlank(id)) {
            throw new RuntimeException("No nimbus leader participant host found, have you started your nimbus hosts?");
        }
        NimbusInfo parse = NimbusInfo.parse(id);
        parse.setLeader(participant.isLeader());
        return parse;
    }

    public static LeaderLatchListener leaderLatchListenerImpl(Map map, final CuratorFramework curatorFramework, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
        final String canonicalHostName = InetAddress.getLocalHost().getCanonicalHostName();
        return new LeaderLatchListener() { // from class: org.apache.storm.zookeeper.Zookeeper.2
            final String STORM_JAR_SUFFIX = "-stormjar.jar";
            final String STORM_CODE_SUFFIX = "-stormcode.ser";
            final String STORM_CONF_SUFFIX = "-stormconf.ser";

            @Override // org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener
            public void isLeader() {
                Set<String> treeSet = new TreeSet<>(Zookeeper.getChildren(CuratorFramework.this, ClusterUtils.STORMS_SUBTREE, false));
                Set<String> populateTopologyBlobKeys = populateTopologyBlobKeys(treeSet);
                Set<String> filterTopologyCodeKeys = filterTopologyCodeKeys(populateTopologyBlobKeys);
                Set<String> newHashSet = Sets.newHashSet(blobStore.listKeys());
                Set<String> filterTopologyBlobKeys = filterTopologyBlobKeys(newHashSet);
                Sets.SetView difference = Sets.difference(populateTopologyBlobKeys, filterTopologyBlobKeys);
                Zookeeper.LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", generateJoinedString(treeSet), generateJoinedString(filterTopologyBlobKeys), generateJoinedString(difference));
                if (!difference.isEmpty()) {
                    Zookeeper.LOG.info("code for all active topologies not available locally, giving up leadership.");
                    closeLatch();
                    return;
                }
                Set<String> topologyDependencyKeys = getTopologyDependencyKeys(filterTopologyCodeKeys);
                Sets.SetView difference2 = Sets.difference(topologyDependencyKeys, newHashSet);
                Zookeeper.LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", generateJoinedString(topologyDependencyKeys), generateJoinedString(newHashSet), generateJoinedString(difference2));
                if (difference2.isEmpty()) {
                    Zookeeper.LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
                } else {
                    Zookeeper.LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
                    closeLatch();
                }
            }

            @Override // org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener
            public void notLeader() {
                Zookeeper.LOG.info("{} lost leadership.", canonicalHostName);
            }

            private String generateJoinedString(Set<String> set) {
                return Joiner.on(",").join(set);
            }

            private Set<String> populateTopologyBlobKeys(Set<String> set) {
                TreeSet treeSet = new TreeSet();
                for (String str : set) {
                    treeSet.add(str + "-stormjar.jar");
                    treeSet.add(str + "-stormcode.ser");
                    treeSet.add(str + "-stormconf.ser");
                }
                return treeSet;
            }

            private Set<String> filterTopologyBlobKeys(Set<String> set) {
                HashSet hashSet = new HashSet();
                for (String str : set) {
                    if (str.endsWith("-stormjar.jar") || str.endsWith("-stormcode.ser") || str.endsWith("-stormconf.ser")) {
                        hashSet.add(str);
                    }
                }
                return hashSet;
            }

            private Set<String> filterTopologyCodeKeys(Set<String> set) {
                HashSet hashSet = new HashSet();
                for (String str : set) {
                    if (str.endsWith("-stormcode.ser")) {
                        hashSet.add(str);
                    }
                }
                return hashSet;
            }

            private Set<String> getTopologyDependencyKeys(Set<String> set) {
                TreeSet treeSet = new TreeSet();
                Subject subject = ReqContext.context().subject();
                for (String str : set) {
                    try {
                        InputStreamWithMeta blob = blobStore.getBlob(str, subject);
                        StormTopology stormTopology = (StormTopology) Utils.deserialize(IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue()), StormTopology.class);
                        if (stormTopology.is_set_dependency_jars()) {
                            treeSet.addAll(stormTopology.get_dependency_jars());
                        }
                        if (stormTopology.is_set_dependency_artifacts()) {
                            treeSet.addAll(stormTopology.get_dependency_artifacts());
                        }
                    } catch (IOException | AuthorizationException | KeyNotFoundException e) {
                        Zookeeper.LOG.error("Exception occurs while reading blob for key: " + str + ", exception: " + e, e);
                        throw new RuntimeException("Exception occurs while reading blob for key: " + str + ", exception: " + e, e);
                    }
                }
                return treeSet;
            }

            private void closeLatch() {
                try {
                    leaderLatch.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    public static ILeaderElector zkLeaderElector(Map map, CuratorFramework curatorFramework, BlobStore blobStore) throws UnknownHostException {
        return _instance.zkLeaderElectorImpl(map, curatorFramework, blobStore);
    }

    protected ILeaderElector zkLeaderElectorImpl(Map map, CuratorFramework curatorFramework, BlobStore blobStore) throws UnknownHostException {
        List list = (List) map.get(Config.STORM_ZOOKEEPER_SERVERS);
        String hostPortString = NimbusInfo.fromConf(map).toHostPortString();
        AtomicReference atomicReference = new AtomicReference(new LeaderLatch(curatorFramework, "/leader-lock", hostPortString));
        return new LeaderElectorImp(map, list, curatorFramework, "/leader-lock", hostPortString, atomicReference, new AtomicReference(leaderLatchListenerImpl(map, curatorFramework, blobStore, (LeaderLatch) atomicReference.get())), blobStore);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static VersionedData<byte[]> getDataWithVersion(CuratorFramework curatorFramework, String str, boolean z) {
        VersionedData<byte[]> versionedData = null;
        try {
            Stat stat = new Stat();
            String normalizePath = normalizePath(str);
            if (existsNode(curatorFramework, normalizePath, z)) {
                byte[] forPath = z ? (byte[]) ((Pathable) curatorFramework.getData().storingStatIn(stat).watched()).forPath(normalizePath) : curatorFramework.getData().storingStatIn(stat).forPath(normalizePath);
                if (forPath != null) {
                    versionedData = new VersionedData<>(stat.getVersion(), forPath);
                }
            }
        } catch (Exception e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                Utils.wrapInRuntime(e);
            }
        }
        return versionedData;
    }

    public static List<String> tokenizePath(String str) {
        String[] split = str.split("/");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            if (!str2.isEmpty()) {
                arrayList.add(str2);
            }
        }
        return arrayList;
    }

    public static String parentPath(String str) {
        List<String> list = tokenizePath(str);
        int size = list.size();
        if (size > 0) {
            list.remove(size - 1);
        }
        return toksToPath(list);
    }

    public static String toksToPath(List<String> list) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("/");
        int size = list.size();
        for (int i = 0; i < size; i++) {
            stringBuffer.append(list.get(i));
            if (i < size - 1) {
                stringBuffer.append("/");
            }
        }
        return stringBuffer.toString();
    }

    public static String normalizePath(String str) {
        return toksToPath(tokenizePath(str));
    }
}
