package org.apache.hw_v4_0_0.hedwig.server.handlers;

import com.google.protobuf.ByteString;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hw_v4_0_0.hedwig.client.data.TopicSubscriber;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hw_v4_0_0.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.delivery.ChannelEndPoint;
import org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryManager;
import org.apache.hw_v4_0_0.hedwig.server.netty.UmbrellaHandler;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.TrueFilter;
import org.apache.hw_v4_0_0.hedwig.server.topics.TopicManager;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.jboss.hw_v4_0_0.netty.channel.Channel;
import org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/handlers/SubscribeHandler.class */
public class SubscribeHandler extends BaseHandler implements ChannelDisconnectListener {
    static Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
    private DeliveryManager deliveryMgr;
    private PersistenceManager persistenceMgr;
    private SubscriptionManager subMgr;
    ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
    ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;

    public SubscribeHandler(TopicManager topicManager, DeliveryManager deliveryManager, PersistenceManager persistenceManager, SubscriptionManager subscriptionManager, ServerConfiguration serverConfiguration) {
        super(topicManager, serverConfiguration);
        this.deliveryMgr = deliveryManager;
        this.persistenceMgr = persistenceManager;
        this.subMgr = subscriptionManager;
        this.sub2Channel = new ConcurrentHashMap<>();
        this.channel2sub = new ConcurrentHashMap<>();
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.handlers.ChannelDisconnectListener
    public void channelDisconnected(Channel channel) {
        synchronized (channel) {
            TopicSubscriber remove = this.channel2sub.remove(channel);
            if (remove != null) {
                this.sub2Channel.remove(remove);
            }
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.handlers.BaseHandler
    public void handleRequestAtOwner(final PubSubProtocol.PubSubRequest pubSubRequest, final Channel channel) {
        if (!pubSubRequest.hasSubscribeRequest()) {
            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, pubSubRequest.getTxnId(), "Missing subscribe request data");
            return;
        }
        final ByteString topic = pubSubRequest.getTopic();
        try {
            PubSubProtocol.MessageSeqId currentSeqIdForTopic = this.persistenceMgr.getCurrentSeqIdForTopic(topic);
            final PubSubProtocol.SubscribeRequest subscribeRequest = pubSubRequest.getSubscribeRequest();
            final ByteString subscriberId = subscribeRequest.getSubscriberId();
            this.subMgr.serveSubscribeRequest(topic, subscribeRequest, PubSubProtocol.MessageSeqId.newBuilder(currentSeqIdForTopic).setLocalComponent(currentSeqIdForTopic.getLocalComponent()).build(), new Callback<PubSubProtocol.MessageSeqId>() { // from class: org.apache.hw_v4_0_0.hedwig.server.handlers.SubscribeHandler.1
                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFailed(Object obj, PubSubException pubSubException) {
                    channel.write(PubSubResponseUtils.getResponseForException(pubSubException, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
                }

                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFinished(Object obj, PubSubProtocol.MessageSeqId messageSeqId) {
                    TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
                    synchronized (channel) {
                        if (channel.isConnected()) {
                            if (null != SubscribeHandler.this.sub2Channel.putIfAbsent(topicSubscriber, channel)) {
                                channel.write(PubSubResponseUtils.getResponseForException(new PubSubException.TopicBusyException("subscription for this topic, subscriberId is already being served on a different channel"), pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
                            } else {
                                SubscribeHandler.this.channel2sub.put(channel, topicSubscriber);
                                channel.write(PubSubResponseUtils.getSuccessResponse(pubSubRequest.getTxnId()));
                                SubscribeHandler.this.deliveryMgr.startServingSubscription(topic, subscriberId, PubSubProtocol.MessageSeqId.newBuilder(messageSeqId).setLocalComponent(messageSeqId.getLocalComponent() + 1).build(), new ChannelEndPoint(channel), TrueFilter.instance(), SubscriptionStateUtils.isHubSubscriber(subscribeRequest.getSubscriberId()));
                            }
                        }
                    }
                }
            }, null);
        } catch (PubSubException.ServerNotResponsibleForTopicException e) {
            channel.write(PubSubResponseUtils.getResponseForException(e, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
        }
    }
}
