package org.apache.hw_v4_0_0.hedwig.server.subscriptions;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hw_v4_0_0.commons.lang.StringUtils;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.MessageIdUtils;
import org.apache.hw_v4_0_0.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.TopicManager;
import org.apache.hw_v4_0_0.hedwig.server.topics.TopicOwnershipChangeListener;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.apache.hw_v4_0_0.hedwig.util.CallbackUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager.class */
public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
    ServerConfiguration cfg;
    static Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
    TopicOpQueuer queuer;
    private final PersistenceManager pm;
    ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<>();
    private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<>();
    private final ConcurrentHashMap<ByteString, AtomicInteger> topic2LocalCounts = new ConcurrentHashMap<>();
    private final Timer timer = new Timer(true);
    private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager$AcquireOp.class */
    public class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public AcquireOp(com.google.protobuf.ByteString r8, org.apache.hw_v4_0_0.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.<init>(org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, org.apache.hw_v4_0_0.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AbstractSubscriptionManager.this.top2sub2seq.containsKey(this.topic)) {
                this.cb.operationFinished(this.ctx, null);
            }
            AbstractSubscriptionManager.this.readSubscriptions(this.topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.1
                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFailed(Object obj, PubSubException pubSubException) {
                    AcquireOp.this.cb.operationFailed(obj, pubSubException);
                }

                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFinished(Object obj, final Map<ByteString, InMemorySubscriptionState> map) {
                    int i = 0;
                    Iterator<ByteString> it = map.keySet().iterator();
                    while (it.hasNext()) {
                        if (!SubscriptionStateUtils.isHubSubscriber(it.next())) {
                            i++;
                        }
                    }
                    AbstractSubscriptionManager.this.topic2LocalCounts.put(AcquireOp.this.topic, new AtomicInteger(i));
                    Callback<Void> callback = new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.1.1
                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                        public void operationFailed(Object obj2, PubSubException pubSubException) {
                            AbstractSubscriptionManager.logger.error("Subscription manager failed to acquired topic " + AcquireOp.this.topic.toStringUtf8(), pubSubException);
                            AcquireOp.this.cb.operationFailed(obj2, null);
                        }

                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                        public void operationFinished(Object obj2, Void r6) {
                            AbstractSubscriptionManager.this.top2sub2seq.put(AcquireOp.this.topic, map);
                            AbstractSubscriptionManager.logger.info("Subscription manager successfully acquired topic: " + AcquireOp.this.topic.toStringUtf8());
                            AcquireOp.this.cb.operationFinished(obj2, null);
                        }
                    };
                    if (i > 0) {
                        AbstractSubscriptionManager.this.notifySubscribe(AcquireOp.this.topic, false, callback, obj);
                    } else {
                        callback.operationFinished(obj, null);
                    }
                }
            }, this.ctx);
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager$ConsumeOp.class */
    private class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
        ByteString subscriberId;
        PubSubProtocol.MessageSeqId consumeSeqId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ConsumeOp(com.google.protobuf.ByteString r8, com.google.protobuf.ByteString r9, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId r10, org.apache.hw_v4_0_0.hedwig.util.Callback<java.lang.Void> r11, java.lang.Object r12) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r11
                r4 = r12
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subscriberId = r1
                r0 = r6
                r1 = r10
                r0.consumeSeqId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.ConsumeOp.<init>(org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, com.google.protobuf.ByteString, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol$MessageSeqId, org.apache.hw_v4_0_0.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFinished(this.ctx, null);
                return;
            }
            InMemorySubscriptionState inMemorySubscriptionState = map.get(this.subscriberId);
            if (inMemorySubscriptionState == null) {
                this.cb.operationFinished(this.ctx, null);
            } else {
                if (inMemorySubscriptionState.setLastConsumeSeqId(this.consumeSeqId, AbstractSubscriptionManager.this.cfg.getConsumeInterval())) {
                    AbstractSubscriptionManager.this.updateSubscriptionState(this.topic, this.subscriberId, inMemorySubscriptionState.getSubscriptionState(), this.cb, this.ctx);
                    return;
                }
                if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                    AbstractSubscriptionManager.logger.debug("Only advanced consume pointer in memory, will persist later, topic: " + this.topic.toStringUtf8() + " subscriberId: " + this.subscriberId.toStringUtf8() + " persistentState: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState()) + " in-memory consume-id: " + MessageIdUtils.msgIdToReadableString(inMemorySubscriptionState.getLastConsumeSeqId()));
                }
                this.cb.operationFinished(this.ctx, null);
            }
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager$MessagesConsumedTask.class */
    class MessagesConsumedTask extends TimerTask {
        MessagesConsumedTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            for (ByteString byteString : AbstractSubscriptionManager.this.top2sub2seq.keySet()) {
                Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(byteString);
                long j = Long.MAX_VALUE;
                for (InMemorySubscriptionState inMemorySubscriptionState : map.values()) {
                    if (inMemorySubscriptionState.getSubscriptionState().getMsgId().getLocalComponent() < j) {
                        j = inMemorySubscriptionState.getSubscriptionState().getMsgId().getLocalComponent();
                    }
                }
                if ((map.isEmpty() || (AbstractSubscriptionManager.this.topic2MinConsumedMessagesMap.containsKey(byteString) && ((Long) AbstractSubscriptionManager.this.topic2MinConsumedMessagesMap.get(byteString)).longValue() == j) || j == 0) ? false : true) {
                    AbstractSubscriptionManager.this.topic2MinConsumedMessagesMap.put(byteString, Long.valueOf(j));
                    AbstractSubscriptionManager.this.pm.consumedUntil(byteString, Long.valueOf(j));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager$SubscribeOp.class */
    public class SubscribeOp extends TopicOpQueuer.AsynchronousOp<PubSubProtocol.MessageSeqId> {
        PubSubProtocol.SubscribeRequest subRequest;
        PubSubProtocol.MessageSeqId consumeSeqId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public SubscribeOp(com.google.protobuf.ByteString r8, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.SubscribeRequest r9, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId r10, org.apache.hw_v4_0_0.hedwig.util.Callback<org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId> r11, java.lang.Object r12) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r11
                r4 = r12
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subRequest = r1
                r0 = r6
                r1 = r10
                r0.consumeSeqId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.<init>(org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol$SubscribeRequest, org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol$MessageSeqId, org.apache.hw_v4_0_0.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            final Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(StringUtils.EMPTY));
                return;
            }
            final ByteString subscriberId = this.subRequest.getSubscriberId();
            InMemorySubscriptionState inMemorySubscriptionState = map.get(subscriberId);
            PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach = this.subRequest.getCreateOrAttach();
            if (inMemorySubscriptionState == null) {
                if (!createOrAttach.equals(PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)) {
                    final PubSubProtocol.SubscriptionState build = PubSubProtocol.SubscriptionState.newBuilder().setMsgId(this.consumeSeqId).build();
                    AbstractSubscriptionManager.this.createSubscriptionState(this.topic, subscriberId, build, new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.1
                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                        public void operationFailed(Object obj, PubSubException pubSubException) {
                            SubscribeOp.this.cb.operationFailed(obj, pubSubException);
                        }

                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                        public void operationFinished(Object obj, Void r8) {
                            Callback<Void> callback = new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.1.1
                                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                                public void operationFailed(Object obj2, PubSubException pubSubException) {
                                    AbstractSubscriptionManager.logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic " + SubscribeOp.this.topic.toStringUtf8() + " failed due to failed listener callback", pubSubException);
                                    SubscribeOp.this.cb.operationFailed(obj2, pubSubException);
                                }

                                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                                public void operationFinished(Object obj2, Void r82) {
                                    map.put(subscriberId, new InMemorySubscriptionState(build));
                                    SubscribeOp.this.cb.operationFinished(obj2, SubscribeOp.this.consumeSeqId);
                                }
                            };
                            if (SubscriptionStateUtils.isHubSubscriber(SubscribeOp.this.subRequest.getSubscriberId()) || ((AtomicInteger) AbstractSubscriptionManager.this.topic2LocalCounts.get(SubscribeOp.this.topic)).incrementAndGet() != 1) {
                                callback.operationFinished(obj, r8);
                            } else {
                                AbstractSubscriptionManager.this.notifySubscribe(SubscribeOp.this.topic, SubscribeOp.this.subRequest.getSynchronous(), callback, obj);
                            }
                        }
                    }, this.ctx);
                    return;
                } else {
                    String str = "Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " requested attaching to an existing subscription but it is not subscribed";
                    AbstractSubscriptionManager.logger.debug(str);
                    this.cb.operationFailed(this.ctx, new PubSubException.ClientNotSubscribedException(str));
                    return;
                }
            }
            if (createOrAttach.equals(PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE)) {
                String str2 = "Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " requested creating a subscription but it is already subscribed with state: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState());
                AbstractSubscriptionManager.logger.debug(str2);
                this.cb.operationFailed(this.ctx, new PubSubException.ClientAlreadySubscribedException(str2));
            } else {
                if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                    AbstractSubscriptionManager.logger.debug("Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " attaching to subscription with state: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState()));
                }
                this.cb.operationFinished(this.ctx, inMemorySubscriptionState.getLastConsumeSeqId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/subscriptions/AbstractSubscriptionManager$UnsubscribeOp.class */
    public class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
        ByteString subscriberId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public UnsubscribeOp(com.google.protobuf.ByteString r8, com.google.protobuf.ByteString r9, org.apache.hw_v4_0_0.hedwig.util.Callback<java.lang.Void> r10, java.lang.Object r11) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r10
                r4 = r11
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subscriberId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.UnsubscribeOp.<init>(org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, com.google.protobuf.ByteString, org.apache.hw_v4_0_0.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            final Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(StringUtils.EMPTY));
            } else if (map.containsKey(this.subscriberId)) {
                AbstractSubscriptionManager.this.deleteSubscriptionState(this.topic, this.subscriberId, new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.subscriptions.AbstractSubscriptionManager.UnsubscribeOp.1
                    @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        UnsubscribeOp.this.cb.operationFailed(obj, pubSubException);
                    }

                    @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                    public void operationFinished(Object obj, Void r6) {
                        map.remove(UnsubscribeOp.this.subscriberId);
                        if (!SubscriptionStateUtils.isHubSubscriber(UnsubscribeOp.this.subscriberId) && ((AtomicInteger) AbstractSubscriptionManager.this.topic2LocalCounts.get(UnsubscribeOp.this.topic)).decrementAndGet() == 0) {
                            AbstractSubscriptionManager.this.notifyUnsubcribe(UnsubscribeOp.this.topic);
                        }
                        UnsubscribeOp.this.cb.operationFinished(obj, null);
                    }
                }, this.ctx);
            } else {
                this.cb.operationFailed(this.ctx, new PubSubException.ClientNotSubscribedException(StringUtils.EMPTY));
            }
        }
    }

    public AbstractSubscriptionManager(ServerConfiguration serverConfiguration, TopicManager topicManager, PersistenceManager persistenceManager, ScheduledExecutorService scheduledExecutorService) {
        this.cfg = serverConfiguration;
        this.queuer = new TopicOpQueuer(scheduledExecutorService);
        topicManager.addTopicOwnershipChangeListener(this);
        this.pm = persistenceManager;
        if (persistenceManager != null) {
            this.timer.schedule(new MessagesConsumedTask(), 0L, serverConfiguration.getMessagesConsumedThreadRunInterval());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifySubscribe(ByteString byteString, boolean z, Callback<Void> callback, Object obj) {
        Callback<Void> multiCallback = CallbackUtils.multiCallback(this.listeners.size(), callback, obj);
        Iterator<SubscriptionEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onFirstLocalSubscribe(byteString, z, multiCallback);
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.topics.TopicOwnershipChangeListener
    public void acquiredTopic(ByteString byteString, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new AcquireOp(this, byteString, callback, obj));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.topics.TopicOwnershipChangeListener
    public void lostTopic(ByteString byteString) {
        this.top2sub2seq.remove(byteString);
        AtomicInteger remove = this.topic2LocalCounts.remove(byteString);
        if (null == remove || remove.get() <= 0) {
            return;
        }
        notifyUnsubcribe(byteString);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyUnsubcribe(ByteString byteString) {
        Iterator<SubscriptionEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onLastLocalUnsubscribe(byteString);
        }
    }

    protected abstract void readSubscriptions(ByteString byteString, Callback<Map<ByteString, InMemorySubscriptionState>> callback, Object obj);

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager
    public void serveSubscribeRequest(ByteString byteString, PubSubProtocol.SubscribeRequest subscribeRequest, PubSubProtocol.MessageSeqId messageSeqId, Callback<PubSubProtocol.MessageSeqId> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new SubscribeOp(this, byteString, subscribeRequest, messageSeqId, callback, obj));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager
    public void setConsumeSeqIdForSubscriber(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new ConsumeOp(this, byteString, byteString2, messageSeqId, callback, obj));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager
    public void unsubscribe(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new UnsubscribeOp(this, byteString, byteString2, callback, obj));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager
    public void addListener(SubscriptionEventListener subscriptionEventListener) {
        this.listeners.add(subscriptionEventListener);
    }

    public void stop() {
        this.timer.cancel();
    }

    protected abstract void createSubscriptionState(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionState subscriptionState, Callback<Void> callback, Object obj);

    protected abstract void updateSubscriptionState(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionState subscriptionState, Callback<Void> callback, Object obj);

    protected abstract void deleteSubscriptionState(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj);
}
