package org.apache.storm.nimbus;

import com.codahale.metrics.Meter;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import javax.security.auth.Subject;
import org.apache.commons.io.IOUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.shade.com.google.common.base.Joiner;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/nimbus/LeaderListenerCallback.class */
public class LeaderListenerCallback {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
    private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
    private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
    private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
    private final Meter numGainedLeader;
    private final Meter numLostLeader;
    private final BlobStore blobStore;
    private final TopoCache tc;
    private final IStormClusterState clusterState;
    private final CuratorFramework zk;
    private final ILeaderElector leaderElector;
    private final Map conf;
    private final List<ACL> acls;
    private final int requeueDelayMs;

    public LeaderListenerCallback(Map map, CuratorFramework curatorFramework, BlobStore blobStore, ILeaderElector iLeaderElector, TopoCache topoCache, IStormClusterState iStormClusterState, List<ACL> list, StormMetricsRegistry stormMetricsRegistry) {
        this.blobStore = blobStore;
        this.tc = topoCache;
        this.clusterState = iStormClusterState;
        this.zk = curatorFramework;
        this.leaderElector = iLeaderElector;
        this.conf = map;
        this.acls = list;
        this.numGainedLeader = stormMetricsRegistry.registerMeter("nimbus:num-gained-leadership");
        this.numLostLeader = stormMetricsRegistry.registerMeter("nimbus:num-lost-leadership");
        this.requeueDelayMs = ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS)).intValue() * 1000;
    }

    public void leaderCallBack(Object obj) {
        this.numGainedLeader.mark();
        setUpNimbusInfo(this.acls);
        LOG.info("Locking sync remote assignments and id-info to local");
        synchronized (obj) {
            this.clusterState.syncRemoteAssignments((Map) null);
            this.clusterState.syncRemoteIds((Map) null);
            this.clusterState.setAssignmentsBackendSynchronized();
        }
        TreeSet treeSet = new TreeSet(ClientZookeeper.getChildren(this.zk, "/storms", false));
        Set<String> populateTopologyBlobKeys = populateTopologyBlobKeys(treeSet);
        Set<String> filterTopologyCodeKeys = filterTopologyCodeKeys(populateTopologyBlobKeys);
        HashSet newHashSet = Sets.newHashSet(this.blobStore.listKeys());
        Set<String> filterTopologyBlobKeys = filterTopologyBlobKeys(newHashSet);
        Sets.SetView difference = Sets.difference(populateTopologyBlobKeys, filterTopologyBlobKeys);
        LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", new Object[]{generateJoinedString(treeSet), generateJoinedString(filterTopologyBlobKeys), generateJoinedString(difference)});
        if (!difference.isEmpty()) {
            LOG.info("code for all active topologies not available locally, giving up leadership.");
            surrenderLeadership();
            return;
        }
        Set<String> topologyDependencyKeys = getTopologyDependencyKeys(filterTopologyCodeKeys);
        Sets.SetView difference2 = Sets.difference(topologyDependencyKeys, newHashSet);
        LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", new Object[]{generateJoinedString(topologyDependencyKeys), generateJoinedString(newHashSet), generateJoinedString(difference2)});
        if (difference2.isEmpty()) {
            LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
            this.tc.clear();
        } else {
            LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
            surrenderLeadership();
        }
    }

    public void notLeaderCallback() {
        this.numLostLeader.mark();
        this.tc.clear();
    }

    private void setUpNimbusInfo(List<ACL> list) {
        String str = this.conf.get("storm.zookeeper.root") + "/leader-info";
        NimbusInfo fromConf = NimbusInfo.fromConf(this.conf);
        if (ClientZookeeper.existsNode(this.zk, str, false)) {
            ClientZookeeper.setData(this.zk, str, Utils.javaSerialize(fromConf));
        } else {
            ClientZookeeper.createNode(this.zk, str, Utils.javaSerialize(fromConf), CreateMode.PERSISTENT, list);
        }
    }

    private String generateJoinedString(Set<String> set) {
        return Joiner.on(",").join(set);
    }

    private Set<String> populateTopologyBlobKeys(Set<String> set) {
        TreeSet treeSet = new TreeSet();
        for (String str : set) {
            treeSet.add(str + "-stormjar.jar");
            treeSet.add(str + "-stormcode.ser");
            treeSet.add(str + "-stormconf.ser");
        }
        return treeSet;
    }

    private Set<String> filterTopologyBlobKeys(Set<String> set) {
        HashSet hashSet = new HashSet();
        for (String str : set) {
            if (str.endsWith(STORM_JAR_SUFFIX) || str.endsWith(STORM_CODE_SUFFIX) || str.endsWith(STORM_CONF_SUFFIX)) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    private Set<String> filterTopologyCodeKeys(Set<String> set) {
        HashSet hashSet = new HashSet();
        for (String str : set) {
            if (str.endsWith(STORM_CODE_SUFFIX)) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    private Set<String> getTopologyDependencyKeys(Set<String> set) {
        TreeSet treeSet = new TreeSet();
        Subject subject = ReqContext.context().subject();
        for (String str : set) {
            try {
                InputStreamWithMeta blob = this.blobStore.getBlob(str, subject);
                try {
                    StormTopology stormTopology = (StormTopology) Utils.deserialize(IOUtils.readFully(blob, new Long(blob.getFileLength()).intValue()), StormTopology.class);
                    if (stormTopology.is_set_dependency_jars()) {
                        treeSet.addAll(stormTopology.get_dependency_jars());
                    }
                    if (stormTopology.is_set_dependency_artifacts()) {
                        treeSet.addAll(stormTopology.get_dependency_artifacts());
                    }
                    if (blob != null) {
                        blob.close();
                    }
                } finally {
                }
            } catch (AuthorizationException | KeyNotFoundException | IOException e) {
                LOG.error("Exception occurs while reading blob for key: " + str + ", exception: " + e, e);
                throw new RuntimeException("Exception occurs while reading blob for key: " + str + ", exception: " + e, e);
            }
        }
        return treeSet;
    }

    private void surrenderLeadership() {
        try {
            this.leaderElector.quitElectionFor(this.requeueDelayMs);
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}
