package org.apache.hw_v4_0_0.hedwig.client.handlers;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hw_v4_0_0.hedwig.client.api.MessageHandler;
import org.apache.hw_v4_0_0.hedwig.client.data.MessageConsumeData;
import org.apache.hw_v4_0_0.hedwig.client.data.PubSubData;
import org.apache.hw_v4_0_0.hedwig.client.data.TopicSubscriber;
import org.apache.hw_v4_0_0.hedwig.client.netty.HedwigClientImpl;
import org.apache.hw_v4_0_0.hedwig.client.netty.ResponseHandler;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.jboss.hw_v4_0_0.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/client/handlers/SubscribeResponseHandler.class */
public class SubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(SubscribeResponseHandler.class);
    private final ResponseHandler responseHandler;
    private PubSubData origSubData;
    private Channel subscribeChannel;
    private MessageHandler messageHandler;
    private int numConsumedMessagesInBuffer = 0;
    private PubSubProtocol.MessageSeqId lastMessageSeqId;
    private Queue<PubSubProtocol.Message> subscribeMsgQueue;
    private Set<PubSubProtocol.Message> outstandingMsgSet;

    public SubscribeResponseHandler(ResponseHandler responseHandler) {
        this.responseHandler = responseHandler;
    }

    public PubSubData getOrigSubData() {
        return this.origSubData;
    }

    public void handleSubscribeResponse(PubSubProtocol.PubSubResponse pubSubResponse, PubSubData pubSubData, Channel channel) throws Exception {
        if (!pubSubResponse.getStatusCode().equals(PubSubProtocol.StatusCode.SUCCESS)) {
            HedwigClientImpl.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
            channel.close();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe response: " + pubSubResponse + ", pubSubData: " + pubSubData + ", host: " + HedwigClientImpl.getHostFromChannel(channel));
        }
        switch (pubSubResponse.getStatusCode()) {
            case SUCCESS:
                this.subscribeChannel = channel;
                this.subscribeChannel.setReadable(false);
                this.origSubData = pubSubData;
                this.responseHandler.getSubscriber().setChannelForTopic(new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId), channel);
                this.outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap(this.responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
                pubSubData.callback.operationFinished(pubSubData.context, null);
                return;
            case CLIENT_ALREADY_SUBSCRIBED:
                pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.ClientAlreadySubscribedException("Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: " + pubSubData.subscriberId.toStringUtf8()));
                return;
            case SERVICE_DOWN:
                pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.ServiceDownException("Server responded with a SERVICE_DOWN status"));
                return;
            case NOT_RESPONSIBLE_FOR_TOPIC:
                this.responseHandler.handleRedirectResponse(pubSubResponse, pubSubData, channel);
                return;
            default:
                logger.error("Unexpected error response from server for PubSubResponse: " + pubSubResponse);
                pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.ServiceDownException("Server responded with a status code of: " + pubSubResponse.getStatusCode()));
                return;
        }
    }

    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse pubSubResponse) {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe message in response: " + pubSubResponse + ", topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        PubSubProtocol.Message message = pubSubResponse.getMessage();
        synchronized (this) {
            if (this.messageHandler != null) {
                asyncMessageConsume(message);
            } else {
                if (this.subscribeMsgQueue == null) {
                    this.subscribeMsgQueue = new LinkedList();
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: " + message);
                }
                this.subscribeMsgQueue.add(message);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void asyncMessageConsume(PubSubProtocol.Message message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message + ", topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        this.outstandingMsgSet.add(message);
        if (this.outstandingMsgSet.size() >= this.responseHandler.getConfiguration().getMaximumOutstandingMessages() && this.subscribeChannel.isReadable()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Too many outstanding messages (" + this.outstandingMsgSet.size() + ") so throttling the subscribe netty Channel");
            }
            this.subscribeChannel.setReadable(false);
        }
        this.messageHandler.deliver(this.origSubData.topic, this.origSubData.subscriberId, message, this.responseHandler.getClient().getConsumeCallback(), new MessageConsumeData(this.origSubData.topic, this.origSubData.subscriberId, message));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void messageConsumed(PubSubProtocol.Message message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Message has been successfully consumed by the client app for message: " + message + ", topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        if (this.responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
            this.numConsumedMessagesInBuffer++;
            this.lastMessageSeqId = message.getMsgId();
        }
        this.outstandingMsgSet.remove(message);
        if (this.responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled() && this.numConsumedMessagesInBuffer >= this.responseHandler.getConfiguration().getConsumedMessagesBufferSize()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: " + this.lastMessageSeqId);
            }
            this.responseHandler.getSubscriber().doConsume(this.origSubData, this.subscribeChannel, this.lastMessageSeqId);
            this.numConsumedMessagesInBuffer = 0;
            this.lastMessageSeqId = null;
        }
        if (this.subscribeChannel.isReadable() || this.outstandingMsgSet.size() != 0) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        this.subscribeChannel.setReadable(true);
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        if (logger.isDebugEnabled()) {
            logger.debug("Setting the messageHandler for topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        synchronized (this) {
            this.messageHandler = messageHandler;
            if (messageHandler != null && this.subscribeMsgQueue != null && this.subscribeMsgQueue.size() > 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Consuming " + this.subscribeMsgQueue.size() + " queued up messages for topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
                }
                Iterator<PubSubProtocol.Message> it = this.subscribeMsgQueue.iterator();
                while (it.hasNext()) {
                    asyncMessageConsume(it.next());
                }
                this.subscribeMsgQueue.clear();
            }
        }
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }
}
