package org.apache.hw_v4_0_0.hedwig.server.subscriptions;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.derby.iapi.sql.compile.TypeCompiler;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.TopicManager;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hw_v4_0_0.hedwig.zookeeper.ZkUtils;
import org.apache.hw_v4_0_0.zookkeeper.AsyncCallback;
import org.apache.hw_v4_0_0.zookkeeper.CreateMode;
import org.apache.hw_v4_0_0.zookkeeper.KeeperException;
import org.apache.hw_v4_0_0.zookkeeper.ZooDefs;
import org.apache.hw_v4_0_0.zookkeeper.ZooKeeper;
import org.apache.hw_v4_0_0.zookkeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/ZkSubscriptionManager.class */
public class ZkSubscriptionManager extends AbstractSubscriptionManager {
    ZooKeeper zk;
    protected static final Logger logger = LoggerFactory.getLogger(ZkSubscriptionManager.class);

    public ZkSubscriptionManager(ZooKeeper zooKeeper, TopicManager topicManager, PersistenceManager persistenceManager, ServerConfiguration serverConfiguration, ScheduledExecutorService scheduledExecutorService) {
        super(serverConfiguration, topicManager, persistenceManager, scheduledExecutorService);
        this.zk = zooKeeper;
    }

    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString byteString) {
        return this.cfg.getZkTopicPath(sb, byteString).append("/subscribers");
    }

    private String topicSubscriberPath(ByteString byteString, ByteString byteString2) {
        return topicSubscribersPath(new StringBuilder(), byteString).append(TypeCompiler.DIVIDE_OP).append(byteString2.toStringUtf8()).toString();
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager
    protected void readSubscriptions(final ByteString byteString, final Callback<Map<ByteString, InMemorySubscriptionState>> callback, Object obj) {
        this.zk.getChildren(topicSubscribersPath(new StringBuilder(), byteString).toString(), false, (AsyncCallback.ChildrenCallback) new SafeAsyncZKCallback.ChildrenCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager.1
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.ChildrenCallback
            public void safeProcessResult(int i, String str, final Object obj2, final List<String> list) {
                if (i != KeeperException.Code.OK.intValue() && i != KeeperException.Code.NONODE.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic " + byteString.toStringUtf8(), str, i)));
                    return;
                }
                final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                if (i == KeeperException.Code.NONODE.intValue() || list.size() == 0) {
                    if (ZkSubscriptionManager.logger.isDebugEnabled()) {
                        ZkSubscriptionManager.logger.debug("No subscriptions found while acquiring topic: " + byteString.toStringUtf8());
                    }
                    callback.operationFinished(obj2, concurrentHashMap);
                    return;
                }
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                final AtomicInteger atomicInteger = new AtomicInteger();
                for (final String str2 : list) {
                    final ByteString copyFromUtf8 = ByteString.copyFromUtf8(str2);
                    ZkSubscriptionManager.this.zk.getData(str + TypeCompiler.DIVIDE_OP + str2, false, (AsyncCallback.DataCallback) new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager.1.1
                        static final /* synthetic */ boolean $assertionsDisabled;

                        @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                        public void safeProcessResult(int i2, String str3, Object obj3, byte[] bArr, Stat stat) {
                            if (i2 != KeeperException.Code.OK.intValue()) {
                                reportFailure(new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + copyFromUtf8.toStringUtf8(), str3, i2)));
                                return;
                            }
                            if (atomicBoolean.get()) {
                                return;
                            }
                            try {
                                PubSubProtocol.SubscriptionState parseFrom = PubSubProtocol.SubscriptionState.parseFrom(bArr);
                                if (ZkSubscriptionManager.logger.isDebugEnabled()) {
                                    ZkSubscriptionManager.logger.debug("Found subscription while acquiring topic: " + byteString.toStringUtf8() + " subscriberId: " + str2 + "state: " + SubscriptionStateUtils.toString(parseFrom));
                                }
                                concurrentHashMap.put(copyFromUtf8, new InMemorySubscriptionState(parseFrom));
                                if (atomicInteger.incrementAndGet() == list.size()) {
                                    if (!$assertionsDisabled && concurrentHashMap.size() != atomicInteger.get()) {
                                        throw new AssertionError();
                                    }
                                    callback.operationFinished(obj3, concurrentHashMap);
                                }
                            } catch (InvalidProtocolBufferException e) {
                                String str4 = "Failed to deserialize state for topic: " + byteString.toStringUtf8() + " subscriberId: " + copyFromUtf8.toStringUtf8();
                                ZkSubscriptionManager.logger.error(str4, e);
                                reportFailure(new PubSubException.UnexpectedConditionException(str4));
                            }
                        }

                        private void reportFailure(PubSubException pubSubException) {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                callback.operationFailed(obj2, pubSubException);
                            }
                        }

                        static {
                            $assertionsDisabled = !ZkSubscriptionManager.class.desiredAssertionStatus();
                        }
                    }, obj2);
                }
            }
        }, obj);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager
    protected void createSubscriptionState(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionState subscriptionState, final Callback<Void> callback, Object obj) {
        ZkUtils.createFullPathOptimistic(this.zk, topicSubscriberPath(byteString, byteString2), subscriptionState.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager.2
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
            public void safeProcessResult(int i, String str, Object obj2, String str2) {
                if (i != KeeperException.Code.OK.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not record new subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8(), str, i)));
                } else {
                    if (ZkSubscriptionManager.logger.isDebugEnabled()) {
                        ZkSubscriptionManager.logger.debug("Successfully recorded subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " state: " + SubscriptionStateUtils.toString(subscriptionState));
                    }
                    callback.operationFinished(obj2, null);
                }
            }
        }, obj);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager
    protected void updateSubscriptionState(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionState subscriptionState, final Callback<Void> callback, Object obj) {
        this.zk.setData(topicSubscriberPath(byteString, byteString2), subscriptionState.toByteArray(), -1, new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager.3
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
            public void safeProcessResult(int i, String str, Object obj2, Stat stat) {
                if (i != KeeperException.Code.OK.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " could not set subscription state: " + SubscriptionStateUtils.toString(subscriptionState), str, i)));
                } else {
                    if (ZkSubscriptionManager.logger.isDebugEnabled()) {
                        ZkSubscriptionManager.logger.debug("Successfully updated subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " state: " + SubscriptionStateUtils.toString(subscriptionState));
                    }
                    callback.operationFinished(obj2, null);
                }
            }
        }, obj);
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager
    protected void deleteSubscriptionState(final ByteString byteString, final ByteString byteString2, final Callback<Void> callback, Object obj) {
        this.zk.delete(topicSubscriberPath(byteString, byteString2), -1, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.ZkSubscriptionManager.4
            @Override // org.apache.hw_v4_0_0.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
            public void safeProcessResult(int i, String str, Object obj2) {
                if (i != KeeperException.Code.OK.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " failed to delete subscription", str, i)));
                } else {
                    if (ZkSubscriptionManager.logger.isDebugEnabled()) {
                        ZkSubscriptionManager.logger.debug("Successfully deleted subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8());
                    }
                    callback.operationFinished(obj2, null);
                }
            }
        }, obj);
    }
}
