package org.apache.hw_v4_0_0.hedwig.server.topics;

import com.google.protobuf.ByteString;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import org.apache.derby.iapi.sql.compile.TypeCompiler;
import org.apache.hw_v4_0_0.commons.lang.StringUtils;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.apache.hw_v4_0_0.hedwig.util.ConcurrencyUtils;
import org.apache.hw_v4_0_0.hedwig.util.Either;
import org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress;
import org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hw_v4_0_0.hedwig.zookeeper.ZkUtils;
import org.apache.hw_v4_0_0.zookkeeper.AsyncCallback;
import org.apache.hw_v4_0_0.zookkeeper.CreateMode;
import org.apache.hw_v4_0_0.zookkeeper.KeeperException;
import org.apache.hw_v4_0_0.zookkeeper.WatchedEvent;
import org.apache.hw_v4_0_0.zookkeeper.Watcher;
import org.apache.hw_v4_0_0.zookkeeper.ZooDefs;
import org.apache.hw_v4_0_0.zookkeeper.ZooKeeper;
import org.apache.hw_v4_0_0.zookkeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/topics/ZkTopicManager.class */
public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
    static Logger logger = LoggerFactory.getLogger(ZkTopicManager.class);
    Random rand;
    private ZooKeeper zk;
    String ephemeralNodePath;
    SafeAsyncZKCallback.StatCallback loadReportingStatCallback;
    protected volatile boolean isSuspended;

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/topics/ZkTopicManager$ZkGetOwnerOp.class */
    class ZkGetOwnerOp {
        ByteString topic;
        boolean shouldClaim;
        Callback<HedwigSocketAddress> cb;
        Object ctx;
        String hubPath;

        public ZkGetOwnerOp(ByteString byteString, boolean z, Callback<HedwigSocketAddress> callback, Object obj) {
            this.topic = byteString;
            this.shouldClaim = z;
            this.cb = callback;
            this.ctx = obj;
            this.hubPath = ZkTopicManager.this.hubPath(byteString);
        }

        public void choose() {
            ZkTopicManager.this.zk.getChildren(ZkTopicManager.this.cfg.getZkHostsPrefix(new StringBuilder()).toString(), false, (AsyncCallback.ChildrenCallback) new SafeAsyncZKCallback.ChildrenCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.1
                @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.ChildrenCallback
                public void safeProcessResult(int i, String str, Object obj, List<String> list) {
                    if (i == KeeperException.Code.OK.intValue()) {
                        ZkGetOwnerOp.this.chooseLeastLoadedNode(list);
                    } else {
                        ZkGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not get list of available hubs", str, i)));
                    }
                }
            }, (Object) null);
        }

        public void chooseLeastLoadedNode(final List<String> list) {
            SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.2
                int numResponses = 0;
                int minLoad = Integer.MAX_VALUE;
                String leastLoaded = null;

                @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                    synchronized (this) {
                        if (i == KeeperException.Code.OK.intValue()) {
                            try {
                                int parseInt = Integer.parseInt(new String(bArr));
                                if (ZkTopicManager.logger.isDebugEnabled()) {
                                    ZkTopicManager.logger.debug("Found server: " + obj + " with load: " + parseInt);
                                }
                                if (parseInt < this.minLoad || (parseInt == this.minLoad && ZkTopicManager.this.rand.nextBoolean())) {
                                    this.minLoad = parseInt;
                                    this.leastLoaded = (String) obj;
                                }
                            } catch (NumberFormatException e) {
                                ZkTopicManager.logger.warn("Corrupted load information from hub:" + obj);
                            }
                        }
                        this.numResponses++;
                        if (this.numResponses == list.size()) {
                            if (this.leastLoaded == null) {
                                ZkGetOwnerOp.this.cb.operationFailed(ZkGetOwnerOp.this.ctx, new PubSubException.ServiceDownException("No hub available"));
                                return;
                            }
                            HedwigSocketAddress hedwigSocketAddress = new HedwigSocketAddress(this.leastLoaded);
                            if (hedwigSocketAddress.equals(ZkTopicManager.this.addr)) {
                                ZkGetOwnerOp.this.claim();
                            } else {
                                ZkGetOwnerOp.this.cb.operationFinished(ZkGetOwnerOp.this.ctx, hedwigSocketAddress);
                            }
                        }
                    }
                }
            };
            for (String str : list) {
                ZkTopicManager.this.zk.getData(ZkTopicManager.this.cfg.getZkHostsPrefix(new StringBuilder()).append(TypeCompiler.DIVIDE_OP).append(str).toString(), false, (AsyncCallback.DataCallback) dataCallback, (Object) str);
            }
        }

        public void claimOrChoose() {
            if (this.shouldClaim) {
                claim();
            } else {
                choose();
            }
        }

        public void read() {
            ZkTopicManager.this.zk.getData(this.hubPath, false, (AsyncCallback.DataCallback) new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.3
                @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                    if (i == KeeperException.Code.NONODE.intValue()) {
                        ZkGetOwnerOp.this.claimOrChoose();
                        return;
                    }
                    if (i != KeeperException.Code.OK.intValue()) {
                        ZkGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str, i)));
                        return;
                    }
                    HedwigSocketAddress hedwigSocketAddress = new HedwigSocketAddress(new String(bArr));
                    if (hedwigSocketAddress.equals(ZkTopicManager.this.addr)) {
                        ZkTopicManager.logger.info("Discovered stale self-node for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8() + ", will delete it");
                        ZkTopicManager.this.zk.delete(ZkGetOwnerOp.this.hubPath, stat.getVersion(), new AsyncCallback.VoidCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.3.1
                            @Override // org.apache.hw_v4_0_0.zookkeeper.AsyncCallback.VoidCallback
                            public void processResult(int i2, String str2, Object obj2) {
                                if (KeeperException.Code.OK.intValue() == i2 || KeeperException.Code.NONODE.intValue() == i2) {
                                    ZkGetOwnerOp.this.claimOrChoose();
                                } else {
                                    ZkGetOwnerOp.this.cb.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not delete self node for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str2, i2)));
                                }
                            }
                        }, obj);
                    } else {
                        if (ZkTopicManager.logger.isDebugEnabled()) {
                            ZkTopicManager.logger.debug("topic: " + ZkGetOwnerOp.this.topic.toStringUtf8() + " belongs to someone else: " + hedwigSocketAddress);
                        }
                        ZkGetOwnerOp.this.cb.operationFinished(obj, hedwigSocketAddress);
                    }
                }
            }, this.ctx);
        }

        public void claim() {
            if (ZkTopicManager.logger.isDebugEnabled()) {
                ZkTopicManager.logger.debug("claiming topic: " + this.topic.toStringUtf8());
            }
            ZkUtils.createFullPathOptimistic(ZkTopicManager.this.zk, this.hubPath, ZkTopicManager.this.addr.toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.4
                @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
                public void safeProcessResult(int i, String str, Object obj, String str2) {
                    if (i == KeeperException.Code.OK.intValue()) {
                        if (ZkTopicManager.logger.isDebugEnabled()) {
                            ZkTopicManager.logger.debug("claimed topic: " + ZkGetOwnerOp.this.topic.toStringUtf8());
                        }
                        ZkTopicManager.this.notifyListenersAndAddToOwnedTopics(ZkGetOwnerOp.this.topic, ZkGetOwnerOp.this.cb, obj);
                        ZkTopicManager.this.updateLoadInformation();
                        return;
                    }
                    if (i == KeeperException.Code.NODEEXISTS.intValue()) {
                        ZkGetOwnerOp.this.read();
                    } else {
                        ZkGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to create ephemeral node to claim ownership of topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str, i)));
                    }
                }
            }, this.ctx);
        }
    }

    public ZkTopicManager(ZooKeeper zooKeeper, ServerConfiguration serverConfiguration, ScheduledExecutorService scheduledExecutorService) throws UnknownHostException, PubSubException {
        super(serverConfiguration, scheduledExecutorService);
        this.rand = new Random();
        this.loadReportingStatCallback = new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.1
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
            public void safeProcessResult(int i, String str, Object obj, Stat stat) {
                if (i != KeeperException.Code.OK.intValue()) {
                    ZkTopicManager.logger.warn("Failed to update load information in zk");
                }
            }
        };
        this.isSuspended = false;
        this.zk = zooKeeper;
        this.ephemeralNodePath = serverConfiguration.getZkHostsPrefix(new StringBuilder()).append(TypeCompiler.DIVIDE_OP).append(this.addr).toString();
        zooKeeper.register(new Watcher() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.2
            @Override // org.apache.hw_v4_0_0.zookkeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType().equals(Watcher.Event.EventType.None)) {
                    if (watchedEvent.getState().equals(Watcher.Event.KeeperState.Disconnected)) {
                        ZkTopicManager.logger.warn("ZK client has been disconnected to the ZK server!");
                        ZkTopicManager.this.isSuspended = true;
                    } else if (watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                        if (ZkTopicManager.this.isSuspended) {
                            ZkTopicManager.logger.info("ZK client has been reconnected to the ZK server!");
                        }
                        ZkTopicManager.this.isSuspended = false;
                    }
                }
                if (watchedEvent.getState().equals(Watcher.Event.KeeperState.Expired)) {
                    ZkTopicManager.logger.error("ZK client connection to the ZK server has expired!");
                    System.exit(1);
                }
            }
        });
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        registerWithZookeeper(new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.3
            @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
            public void operationFailed(Object obj, PubSubException pubSubException) {
                ZkTopicManager.logger.error("Failed to register hub with zookeeper", pubSubException);
                ConcurrencyUtils.put(synchronousQueue, Either.of((Void) null, pubSubException));
            }

            @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
            public void operationFinished(Object obj, Void r6) {
                ZkTopicManager.logger.info("Successfully registered hub with zookeeper");
                ConcurrencyUtils.put(synchronousQueue, Either.of(r6, (PubSubException) null));
            }
        }, null);
        PubSubException pubSubException = (PubSubException) ((Either) ConcurrencyUtils.take(synchronousQueue)).right();
        if (pubSubException != null) {
            throw pubSubException;
        }
    }

    void registerWithZookeeper(final Callback<Void> callback, Object obj) {
        ZkUtils.createFullPathOptimistic(this.zk, this.ephemeralNodePath, getCurrentLoadData(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.4
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
            public void safeProcessResult(int i, String str, Object obj2, String str2) {
                if (i == KeeperException.Code.OK.intValue()) {
                    callback.operationFinished(obj2, null);
                } else if (i != KeeperException.Code.NODEEXISTS.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not create ephemeral node to register hub", ZkTopicManager.this.ephemeralNodePath, i)));
                } else {
                    ZkTopicManager.logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
                    ZkTopicManager.this.zk.delete(ZkTopicManager.this.ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.4.1
                        @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                        public void safeProcessResult(int i2, String str3, Object obj3) {
                            if (i2 == KeeperException.Code.OK.intValue() || i2 == KeeperException.Code.NONODE.intValue()) {
                                ZkTopicManager.this.registerWithZookeeper(callback, obj3);
                            } else {
                                callback.operationFailed(obj3, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not delete stale ephemeral node to register hub", ZkTopicManager.this.ephemeralNodePath, i2)));
                            }
                        }
                    }, obj2);
                }
            }
        }, null);
    }

    String hubPath(ByteString byteString) {
        return this.cfg.getZkTopicPath(new StringBuilder(), byteString).append("/hub").toString();
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.topics.AbstractTopicManager
    protected void realGetOwner(ByteString byteString, boolean z, Callback<HedwigSocketAddress> callback, Object obj) {
        if (this.isSuspended) {
            callback.operationFailed(obj, new PubSubException.ServiceDownException("ZKTopicManager service is temporarily suspended!"));
        } else if (this.topics.contains(byteString)) {
            callback.operationFinished(obj, this.addr);
        } else {
            new ZkGetOwnerOp(byteString, z, callback, obj).read();
        }
    }

    byte[] getCurrentLoadData() {
        return (this.topics.size() + StringUtils.EMPTY).getBytes();
    }

    void updateLoadInformation() {
        byte[] currentLoadData = getCurrentLoadData();
        if (logger.isDebugEnabled()) {
            logger.debug("Reporting load of " + new String(currentLoadData));
        }
        this.zk.setData(this.ephemeralNodePath, currentLoadData, -1, this.loadReportingStatCallback, null);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.topics.AbstractTopicManager
    protected void postReleaseCleanup(final ByteString byteString, final Callback<Void> callback, Object obj) {
        this.zk.getData(hubPath(byteString), false, (AsyncCallback.DataCallback) new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.5
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
            public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                if (i == KeeperException.Code.NONODE.intValue()) {
                    ZkTopicManager.logger.warn("While deleting self-node for topic: " + byteString.toStringUtf8() + ", node not found");
                    callback.operationFinished(obj2, null);
                } else {
                    if (i != KeeperException.Code.OK.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to delete self-ownership node for topic: " + byteString.toStringUtf8(), str, i)));
                        return;
                    }
                    HedwigSocketAddress hedwigSocketAddress = new HedwigSocketAddress(new String(bArr));
                    if (hedwigSocketAddress.equals(ZkTopicManager.this.addr)) {
                        ZkTopicManager.this.zk.delete(str, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.topics.ZkTopicManager.5.1
                            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                            public void safeProcessResult(int i2, String str2, Object obj3) {
                                if (i2 == KeeperException.Code.OK.intValue() || i2 == KeeperException.Code.NONODE.intValue()) {
                                    callback.operationFinished(obj3, null);
                                } else {
                                    callback.operationFailed(obj3, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to delete self-ownership node for topic: " + byteString.toStringUtf8(), str2, i2)));
                                }
                            }
                        }, obj2);
                    } else {
                        ZkTopicManager.logger.warn("Wanted to delete self-node for topic: " + byteString.toStringUtf8() + " but node for " + hedwigSocketAddress + " found, leaving untouched");
                        callback.operationFinished(obj2, null);
                    }
                }
            }
        }, obj);
    }
}
