package com.fasterxml.mama;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.mama.balancing.BalancingPolicy;
import com.fasterxml.mama.balancing.CountBalancingPolicy;
import com.fasterxml.mama.balancing.MeteredBalancingPolicy;
import com.fasterxml.mama.listeners.ClusterNodesChangedListener;
import com.fasterxml.mama.listeners.HandoffResultsListener;
import com.fasterxml.mama.listeners.VerifyIntegrityListener;
import com.fasterxml.mama.util.JsonUtil;
import com.fasterxml.mama.util.NamedThreadFactory;
import com.fasterxml.mama.util.Strings;
import com.fasterxml.mama.util.ZKDeserializers;
import com.fasterxml.mama.util.ZKException;
import com.fasterxml.mama.util.ZKUtils;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.zookeeper.ZooKeeperClient;
import com.twitter.common.zookeeper.ZooKeeperMap;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.InstanceAlreadyExistsException;
import javax.management.ObjectName;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/fasterxml/mama/Cluster.class */
public class Cluster implements ClusterMBean {
    public static final String PROJECT_NAME = "SlaveDriver";
    protected final Logger LOG;
    public final String name;
    public final String myNodeID;
    private final String shortName;
    private final SimpleListener listener;
    private final ClusterConfig config;
    protected final AtomicReference<NodeState> state;
    protected final HandoffResultsListener handoffResultsListener;
    private final BalancingPolicy balancingPolicy;
    private final AtomicBoolean watchesRegistered;
    private final AtomicBoolean initialized;
    private final CountDownLatch initializedLatch;
    protected final AtomicBoolean connected;
    public final Set<String> myWorkUnits;
    protected Map<String, String> handoffRequests;
    protected Map<String, String> handoffResults;
    public Set<String> claimedForHandoff;
    private Map<String, Double> loadMap;
    public Set<String> workUnitsPeggedToMe;
    private final Claimer claimer;
    public Map<String, ObjectNode> allWorkUnits;
    public Map<String, NodeInfo> nodes;
    public Map<String, String> workUnitMap;
    private final AtomicReference<ScheduledThreadPoolExecutor> pool;
    private ScheduledFuture<?> autoRebalanceFuture;
    private final Gauge<String> listGauge;
    private final Gauge<Integer> countGauge;
    private final Gauge<String> connStateGauge;
    private final Gauge<String> nodeStateGauge;
    public ZooKeeperClient zk;
    private final Watcher connectionWatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.fasterxml.mama.Cluster$8, reason: invalid class name */
    /* loaded from: input_file:com/fasterxml/mama/Cluster$8.class */
    public static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState;

        static {
            try {
                $SwitchMap$com$fasterxml$mama$NodeState[NodeState.Fresh.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$fasterxml$mama$NodeState[NodeState.Shutdown.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$fasterxml$mama$NodeState[NodeState.Draining.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$fasterxml$mama$NodeState[NodeState.Started.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState = new int[Watcher.Event.KeeperState.values().length];
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.SyncConnected.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.Expired.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[Watcher.Event.KeeperState.Disconnected.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public Cluster(String str, SimpleListener simpleListener, ClusterConfig clusterConfig) {
        this(str, simpleListener, clusterConfig, new MetricRegistry());
    }

    public Cluster(String str, SimpleListener simpleListener, ClusterConfig clusterConfig, MetricRegistry metricRegistry) {
        this.LOG = LoggerFactory.getLogger(getClass());
        this.state = new AtomicReference<>(NodeState.Fresh);
        this.watchesRegistered = new AtomicBoolean(false);
        this.initialized = new AtomicBoolean(false);
        this.initializedLatch = new CountDownLatch(1);
        this.connected = new AtomicBoolean(false);
        this.myWorkUnits = new NonBlockingHashSet();
        this.claimedForHandoff = new NonBlockingHashSet();
        this.loadMap = Collections.emptyMap();
        this.workUnitsPeggedToMe = new NonBlockingHashSet();
        this.connectionWatcher = new Watcher() { // from class: com.fasterxml.mama.Cluster.1
            public void process(WatchedEvent watchedEvent) {
                Watcher.Event.KeeperState state = watchedEvent.getState();
                switch (AnonymousClass8.$SwitchMap$org$apache$zookeeper$Watcher$Event$KeeperState[state.ordinal()]) {
                    case 1:
                        Cluster.this.LOG.info("ZooKeeper session established.");
                        Cluster.this.connected.set(true);
                        try {
                            if (Cluster.this.state.get() != NodeState.Shutdown) {
                                Cluster.this.onConnect();
                            } else {
                                Cluster.this.LOG.info("This node is shut down. ZK connection re-established, but not relaunching.");
                            }
                            return;
                        } catch (Exception e) {
                            Cluster.this.LOG.error("Exception during zookeeper connection established callback", e);
                            return;
                        }
                    case 2:
                        Cluster.this.LOG.info("ZooKeeper session expired.");
                        Cluster.this.connected.set(false);
                        Cluster.this.forceShutdown();
                        Cluster.this.awaitReconnect();
                        break;
                    case 3:
                        break;
                    default:
                        Cluster.this.LOG.info("ZooKeeper session interrupted. Shutting down due to event " + state);
                        Cluster.this.connected.set(false);
                        Cluster.this.awaitReconnect();
                }
                Cluster.this.LOG.info("ZooKeeper session disconnected. Awaiting reconnect...");
                Cluster.this.connected.set(false);
                Cluster.this.awaitReconnect();
                Cluster.this.LOG.info("ZooKeeper session interrupted. Shutting down due to event " + state);
                Cluster.this.connected.set(false);
                Cluster.this.awaitReconnect();
            }
        };
        this.name = str;
        this.listener = simpleListener;
        this.config = clusterConfig;
        this.myNodeID = clusterConfig.nodeId;
        this.shortName = clusterConfig.workUnitShortName;
        this.claimer = new Claimer(metricRegistry, this, "ordasity-claimer-" + this.name);
        this.handoffResultsListener = new HandoffResultsListener(this);
        this.balancingPolicy = clusterConfig.useSmartBalancing ? new MeteredBalancingPolicy(this, this.handoffResultsListener, metricRegistry, simpleListener) : new CountBalancingPolicy(this, this.handoffResultsListener);
        this.pool = new AtomicReference<>(createScheduledThreadExecutor());
        this.listGauge = new Gauge<String>() { // from class: com.fasterxml.mama.Cluster.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public String m1getValue() {
                return Strings.mkstring(Cluster.this.myWorkUnits, ", ");
            }
        };
        metricRegistry.register("my_" + this.shortName, this.listGauge);
        this.countGauge = new Gauge<Integer>() { // from class: com.fasterxml.mama.Cluster.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m2getValue() {
                return Integer.valueOf(Cluster.this.myWorkUnits.size());
            }
        };
        metricRegistry.register("my_" + this.shortName + "_count", this.countGauge);
        this.connStateGauge = new Gauge<String>() { // from class: com.fasterxml.mama.Cluster.4
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public String m3getValue() {
                return Cluster.this.isConnected() ? "true" : "false";
            }
        };
        metricRegistry.register("zk_connection_state", this.connStateGauge);
        this.nodeStateGauge = new Gauge<String>() { // from class: com.fasterxml.mama.Cluster.5
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public String m4getValue() {
                return Cluster.this.getState().toString();
            }
        };
        metricRegistry.register("node_state", this.nodeStateGauge);
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(this.name + ":name=Cluster"));
        } catch (InstanceAlreadyExistsException e) {
            this.LOG.warn("JMX bean already registered; ignoring");
        } catch (Exception e2) {
            this.LOG.error("Problems registering JMX info: " + e2.getMessage(), e2);
        }
    }

    private ScheduledThreadPoolExecutor createScheduledThreadExecutor() {
        return new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("SlaveDriver-scheduler"));
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public boolean isConnected() {
        return this.connected.get();
    }

    public boolean isMe(String str) {
        return this.myNodeID.equals(str);
    }

    public boolean znodeIsMe(String str) {
        String str2 = ZKUtils.get(this.zk, str);
        return str2 != null && str2 == this.myNodeID;
    }

    public ClusterConfig getConfig() {
        return this.config;
    }

    public NodeState getState() {
        return this.state.get();
    }

    public boolean hasState(NodeState nodeState) {
        return this.state.get() == nodeState;
    }

    public double getWorkUnitLoad(String str) {
        if (this.loadMap == null) {
            return 0.0d;
        }
        synchronized (this.loadMap) {
            Double d = this.loadMap.get(str);
            if (d == null) {
                return 0.0d;
            }
            return d.doubleValue();
        }
    }

    public double getTotalWorkUnitLoad() {
        double d;
        if (this.loadMap == null) {
            return 0.0d;
        }
        synchronized (this.loadMap) {
            double d2 = 0.0d;
            for (Double d3 : this.loadMap.values()) {
                if (d3 != null) {
                    d2 += d3.doubleValue();
                }
            }
            d = d2;
        }
        return d;
    }

    public String descForHandoffRequests() {
        return Strings.mkstring(this.handoffRequests, ", ");
    }

    public String descForHandoffResults() {
        return Strings.mkstring(this.handoffResults, ", ");
    }

    public boolean containsHandoffRequest(String str) {
        return this.handoffRequests.containsKey(str);
    }

    public boolean containsHandoffResult(String str) {
        return this.handoffResults.containsKey(str);
    }

    public Set<String> getHandoffWorkUnits() {
        return this.handoffRequests.keySet();
    }

    public Set<String> getHandoffResultWorkUnits() {
        return this.handoffResults.keySet();
    }

    public String getHandoffResult(String str) {
        return this.handoffResults.get(str);
    }

    public void schedule(Runnable runnable, long j, TimeUnit timeUnit) {
        this.pool.get().schedule(runnable, j, timeUnit);
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        return this.pool.get().scheduleAtFixedRate(runnable, j, j2, timeUnit);
    }

    @Override // com.fasterxml.mama.ClusterMBean
    public String join() throws InterruptedException {
        return join(null);
    }

    public String join(ZooKeeperClient zooKeeperClient) throws InterruptedException {
        switch (this.state.get()) {
            case Fresh:
                connect(zooKeeperClient);
                break;
            case Shutdown:
                connect(zooKeeperClient);
                break;
            case Draining:
                this.LOG.warn("'join' called while draining; ignoring.");
                break;
            case Started:
                this.LOG.warn("'join' called after started; ignoring.");
                break;
        }
        return this.state.get().toString();
    }

    protected void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.fasterxml.mama.Cluster.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Cluster.this.LOG.info("Cleaning up ephemeral ZooKeeper state");
                Cluster.this.completeShutdown();
            }
        });
    }

    void awaitReconnect() {
        while (true) {
            try {
                this.LOG.info("Awaiting reconnection to ZooKeeper...");
                this.zk.get(Amount.of(1L, Time.SECONDS));
            } catch (TimeoutException e) {
                this.LOG.warn("Timed out reconnecting to ZooKeeper.", e);
            } catch (Exception e2) {
                this.LOG.error("Error reconnecting to ZooKeeper", e2);
            }
        }
    }

    private void connect(ZooKeeperClient zooKeeperClient) throws InterruptedException {
        if (!this.initialized.get()) {
            if (zooKeeperClient == null) {
                ArrayList arrayList = new ArrayList();
                for (String str : this.config.hosts.split(",")) {
                    String[] split = str.split(":");
                    try {
                        arrayList.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
                    } catch (Exception e) {
                        this.LOG.error("Invalid ZK host '" + str + "', need to skip, problem: " + e.getMessage());
                    }
                }
                this.LOG.info("Connecting to hosts: {}", arrayList.toString());
                zooKeeperClient = new ZooKeeperClient(Amount.of((int) this.config.zkTimeout, Time.MILLISECONDS), arrayList);
            }
            this.zk = zooKeeperClient;
            this.claimer.start();
            this.LOG.info("Registering connection watcher.");
            this.zk.register(this.connectionWatcher);
        }
        try {
            this.zk.get();
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            throw ZKException.from(e2);
        }
    }

    @Override // com.fasterxml.mama.ClusterMBean
    public void shutdown() {
        if (this.state.get() == NodeState.Shutdown) {
            return;
        }
        this.balancingPolicy.shutdown();
        if (this.autoRebalanceFuture != null) {
            this.autoRebalanceFuture.cancel(true);
        }
        this.LOG.info("Shutdown initiated; beginning drain...");
        setState(NodeState.Draining);
        this.balancingPolicy.drainToCount(0, true);
    }

    void forceShutdown() {
        this.balancingPolicy.shutdown();
        if (this.autoRebalanceFuture != null) {
            this.autoRebalanceFuture.cancel(true);
        }
        this.LOG.warn("Forcible shutdown initiated due to connection loss...");
        shutdownAllWorkUnits();
        this.listener.onLeave();
    }

    public void completeShutdown() {
        setState(NodeState.Shutdown);
        shutdownAllWorkUnits();
        deleteFromZk();
        if (this.claimer != null) {
            this.claimer.interrupt();
            try {
                this.claimer.join();
            } catch (InterruptedException e) {
                this.LOG.warn("Shutdown of Claimer interrupted");
            }
        }
        if (this.connectionWatcher != null) {
            this.zk.unregister(this.connectionWatcher);
        }
        try {
            this.zk.close();
        } catch (Exception e2) {
            this.LOG.warn("Zookeeper reported exception on shutdown.", e2);
        }
        this.listener.onLeave();
    }

    private void deleteFromZk() {
        ZKUtils.delete(this.zk, "/" + this.name + "/nodes/" + this.myNodeID);
    }

    void onConnect() throws InterruptedException, IOException {
        if (this.state.get() != NodeState.Fresh) {
            if (previousZKSessionStillActive()) {
                this.LOG.info("ZooKeeper session re-established before timeout.");
                return;
            } else {
                this.LOG.warn("Rejoined after session timeout. Forcing shutdown and clean startup.");
                ensureCleanStartup();
            }
        }
        this.LOG.info("Connected to Zookeeper (ID: {}).", this.myNodeID);
        ZKUtils.ensureOrdasityPaths(this.zk, this.name, this.config.workUnitName, this.config.workUnitShortName);
        joinCluster();
        this.listener.onJoin(this.zk);
        if (this.watchesRegistered.compareAndSet(false, true)) {
            registerWatchers();
        }
        this.initialized.set(true);
        this.initializedLatch.countDown();
        setState(NodeState.Started);
        this.claimer.requestClaim();
        verifyIntegrity();
        this.balancingPolicy.onConnect();
        if (this.config.enableAutoRebalance) {
            scheduleRebalancing();
        }
    }

    private void ensureCleanStartup() {
        forceShutdown();
        this.pool.getAndSet(createScheduledThreadExecutor()).shutdownNow();
        this.claimedForHandoff.clear();
        this.workUnitsPeggedToMe.clear();
        this.state.set(NodeState.Fresh);
    }

    private void scheduleRebalancing() {
        int i = this.config.autoRebalanceInterval;
        this.autoRebalanceFuture = this.pool.get().scheduleAtFixedRate(new Runnable() { // from class: com.fasterxml.mama.Cluster.7
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Cluster.this.rebalance();
                } catch (Exception e) {
                    Cluster.this.LOG.error("Error running auto-rebalance.", e);
                }
            }
        }, i, i, TimeUnit.SECONDS);
    }

    private void joinCluster() throws InterruptedException, IOException {
        while (true) {
            try {
                if (ZKUtils.createEphemeral(this.zk, "/" + this.name + "/nodes/" + this.myNodeID, JsonUtil.asJSONBytes(new NodeInfo(NodeState.Fresh.toString(), this.zk.get().getSessionId())))) {
                    return;
                }
                this.LOG.warn("Unable to register with Zookeeper on launch. Is {} already running on this host? Retrying in 1 second...", this.name);
                Thread.sleep(1000L);
            } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
                throw ZKException.from(e);
            }
        }
    }

    private void registerWatchers() throws InterruptedException {
        ClusterNodesChangedListener clusterNodesChangedListener = new ClusterNodesChangedListener(this);
        VerifyIntegrityListener verifyIntegrityListener = new VerifyIntegrityListener(this);
        ZKDeserializers.StringDeserializer stringDeserializer = new ZKDeserializers.StringDeserializer();
        try {
            this.nodes = ZooKeeperMap.create(this.zk, String.format("/%s/nodes", this.name), new ZKDeserializers.NodeInfoDeserializer(), clusterNodesChangedListener);
            this.allWorkUnits = ZooKeeperMap.create(this.zk, String.format("/%s", this.config.workUnitName), new ZKDeserializers.ObjectNodeDeserializer(), new VerifyIntegrityListener(this));
            this.workUnitMap = ZooKeeperMap.create(this.zk, String.format("/%s/claimed-%s", this.name, this.config.workUnitShortName), stringDeserializer, verifyIntegrityListener);
            if (this.config.useSoftHandoff) {
                this.handoffRequests = ZooKeeperMap.create(this.zk, String.format("/%s/handoff-requests", this.name), stringDeserializer, verifyIntegrityListener);
                this.handoffResults = ZooKeeperMap.create(this.zk, String.format("/%s/handoff-result", this.name), stringDeserializer, this.handoffResultsListener);
            } else {
                this.handoffRequests = new HashMap();
                this.handoffResults = new HashMap();
            }
            if (this.config.useSmartBalancing) {
                this.loadMap = ZooKeeperMap.create(this.zk, String.format("/%s/meta/workload", this.name), new ZKDeserializers.DoubleDeserializer());
            }
        } catch (KeeperException e) {
            throw ZKException.from(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            throw ZKException.from(e2);
        }
    }

    public void claimWork() throws InterruptedException {
        if (this.state.get() == NodeState.Started && this.connected.get()) {
            this.balancingPolicy.claimWork();
        }
    }

    public void requestClaim() {
        this.claimer.requestClaim();
    }

    public void requestHandoff(String str) throws InterruptedException {
        this.LOG.info("Requesting handoff for {}.", str);
        ZKUtils.createEphemeral(this.zk, "/" + this.name + "/handoff-requests/" + str);
    }

    public void verifyIntegrity() {
        LinkedHashSet linkedHashSet = new LinkedHashSet(this.myWorkUnits);
        linkedHashSet.removeAll(this.allWorkUnits.keySet());
        Iterator it = linkedHashSet.iterator();
        while (it.hasNext()) {
            shutdownWork((String) it.next(), true);
        }
        for (String str : this.myWorkUnits) {
            String workUnitClaimPath = workUnitClaimPath(str);
            if (!this.balancingPolicy.isFairGame(str) && !this.balancingPolicy.isPeggedToMe(str)) {
                this.LOG.info("Discovered I'm serving a work unit that's now pegged to someone else. Shutting down {}", str);
                shutdownWork(str, true);
            } else if (this.workUnitMap.containsKey(str) && !this.workUnitMap.get(str).equals(this.myNodeID) && !this.claimedForHandoff.contains(str) && !znodeIsMe(workUnitClaimPath)) {
                this.LOG.info("Discovered I'm serving a work unit that's now claimed by {} according to ZooKeeper. Shutting down {}", this.workUnitMap.get(str), str);
                shutdownWork(str, true);
            }
        }
    }

    public String workUnitClaimPath(String str) {
        return String.format("/%s/claimed-%s/%s", this.name, this.config.workUnitShortName, str);
    }

    public void startWork(String str) throws InterruptedException {
        this.LOG.info("Successfully claimed {}: {}. Starting...", this.config.workUnitName, str);
        if (!this.myWorkUnits.add(str)) {
            this.LOG.warn("Detected that %s is already a member of my work units; not starting twice!", str);
        } else if (!(this.balancingPolicy instanceof MeteredBalancingPolicy)) {
            ((ClusterListener) this.listener).startWork(str);
        } else {
            ((SmartListener) this.listener).startWork(str, ((MeteredBalancingPolicy) this.balancingPolicy).findOrCreateMetrics(str));
        }
    }

    public void shutdownWork(String str, boolean z) {
        if (z) {
            this.LOG.info("Shutting down {}: {}...", this.config.workUnitName, str);
        }
        this.myWorkUnits.remove(str);
        this.claimedForHandoff.remove(str);
        this.balancingPolicy.onShutdownWork(str);
        try {
            this.listener.shutdownWork(str);
            ZKUtils.deleteAtomic(this.zk, workUnitClaimPath(str), this.myNodeID);
        } catch (Throwable th) {
            ZKUtils.deleteAtomic(this.zk, workUnitClaimPath(str), this.myNodeID);
            throw th;
        }
    }

    @Override // com.fasterxml.mama.ClusterMBean
    public void rebalance() throws InterruptedException {
        if (this.state.get() != NodeState.Fresh) {
            this.balancingPolicy.rebalance();
        }
    }

    private boolean setState(NodeState nodeState) {
        try {
            ZKUtils.set(this.zk, "/" + this.name + "/nodes/" + this.myNodeID, JsonUtil.asJSONBytes(new NodeInfo(nodeState.toString(), this.zk.get().getSessionId())));
            this.state.set(nodeState);
            return true;
        } catch (Exception e) {
            this.LOG.warn("Problem trying to setState(" + nodeState + "): " + e.getMessage(), e);
            return false;
        }
    }

    private boolean previousZKSessionStillActive() {
        try {
            return ((NodeInfo) JsonUtil.fromJSON(this.zk.get().getData(String.format("/%s/nodes/%s", this.name, this.myNodeID), false, (Stat) null), NodeInfo.class)).connectionID == this.zk.get().getSessionId();
        } catch (KeeperException.NoNodeException e) {
            return false;
        } catch (Exception e2) {
            this.LOG.error("Encountered unexpected error in checking ZK session status.", e2);
            return false;
        }
    }

    private void shutdownAllWorkUnits() {
        Iterator<String> it = this.myWorkUnits.iterator();
        while (it.hasNext()) {
            shutdownWork(it.next(), true);
        }
        this.myWorkUnits.clear();
    }
}
