/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.handlers;

import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.ResponseHandler;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(SubscribeResponseHandler.class);
    private final ResponseHandler responseHandler;
    private PubSubData origSubData;
    private Channel subscribeChannel;
    private MessageHandler messageHandler;
    private int numConsumedMessagesInBuffer = 0;
    private PubSubProtocol.MessageSeqId lastMessageSeqId;
    private Queue<PubSubProtocol.Message> subscribeMsgQueue;
    private Set<PubSubProtocol.Message> outstandingMsgSet;

    public SubscribeResponseHandler(ResponseHandler responseHandler) {
        this.responseHandler = responseHandler;
    }

    public synchronized PubSubData getOrigSubData() {
        return this.origSubData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleSubscribeResponse(PubSubProtocol.PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
        if (!response.getStatusCode().equals((Object)PubSubProtocol.StatusCode.SUCCESS)) {
            HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).channelClosedExplicitly = true;
            channel.close();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe response: " + response + ", pubSubData: " + pubSubData + ", host: " + HedwigClientImpl.getHostFromChannel(channel));
        }
        switch (response.getStatusCode()) {
            case SUCCESS: {
                SubscribeResponseHandler subscribeResponseHandler = this;
                synchronized (subscribeResponseHandler) {
                    this.subscribeChannel = channel;
                    this.subscribeChannel.setReadable(false);
                    this.origSubData = pubSubData;
                    TopicSubscriber topicSubscriber = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
                    this.responseHandler.getSubscriber().setChannelForTopic(topicSubscriber, channel);
                    this.outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap(this.responseHandler.getConfiguration().getMaximumOutstandingMessages(), 1.0f));
                }
                pubSubData.callback.operationFinished(pubSubData.context, null);
                break;
            }
            case CLIENT_ALREADY_SUBSCRIBED: {
                pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.ClientAlreadySubscribedException("Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: " + pubSubData.subscriberId.toStringUtf8()));
                break;
            }
            case SERVICE_DOWN: {
                pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException("Server responded with a SERVICE_DOWN status"));
                break;
            }
            case NOT_RESPONSIBLE_FOR_TOPIC: {
                this.responseHandler.handleRedirectResponse(response, pubSubData, channel);
                break;
            }
            default: {
                logger.error("Unexpected error response from server for PubSubResponse: " + response);
                pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException("Server responded with a status code of: " + response.getStatusCode()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse response) {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe message in response: {}, topic: {}, subscriberId: {}", new Object[]{response, this.getOrigSubData().topic.toStringUtf8(), this.getOrigSubData().subscriberId.toStringUtf8()});
        }
        PubSubProtocol.Message message = response.getMessage();
        SubscribeResponseHandler subscribeResponseHandler = this;
        synchronized (subscribeResponseHandler) {
            if (this.messageHandler != null) {
                this.asyncMessageConsume(message);
            } else {
                if (this.subscribeMsgQueue == null) {
                    this.subscribeMsgQueue = new LinkedList<PubSubProtocol.Message>();
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Message has arrived but Subscribe channel does not have a registered MessageHandler yet so queueing up the message: " + message);
                }
                this.subscribeMsgQueue.add(message);
            }
        }
    }

    protected void asyncMessageConsume(PubSubProtocol.Message message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Call the client app's MessageHandler asynchronously to consume the message: " + message + ", topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        this.outstandingMsgSet.add(message);
        if (this.outstandingMsgSet.size() >= this.responseHandler.getConfiguration().getMaximumOutstandingMessages() && this.subscribeChannel.isReadable()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Too many outstanding messages (" + this.outstandingMsgSet.size() + ") so throttling the subscribe netty Channel");
            }
            this.subscribeChannel.setReadable(false);
        }
        MessageConsumeData messageConsumeData = new MessageConsumeData(this.origSubData.topic, this.origSubData.subscriberId, message);
        this.messageHandler.deliver(this.origSubData.topic, this.origSubData.subscriberId, message, this.responseHandler.getClient().getConsumeCallback(), messageConsumeData);
    }

    protected synchronized void messageConsumed(PubSubProtocol.Message message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Message has been successfully consumed by the client app for message: " + message + ", topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
        }
        if (this.responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled()) {
            ++this.numConsumedMessagesInBuffer;
            this.lastMessageSeqId = message.getMsgId();
        }
        this.outstandingMsgSet.remove(message);
        if (this.responseHandler.getConfiguration().isAutoSendConsumeMessageEnabled() && this.numConsumedMessagesInBuffer >= this.responseHandler.getConfiguration().getConsumedMessagesBufferSize()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Consumed message buffer limit reached so send the Consume Request to the server with lastMessageSeqId: " + this.lastMessageSeqId);
            }
            this.responseHandler.getSubscriber().doConsume(this.origSubData, this.subscribeChannel, this.lastMessageSeqId);
            this.numConsumedMessagesInBuffer = 0;
            this.lastMessageSeqId = null;
        }
        if (!this.subscribeChannel.isReadable() && this.outstandingMsgSet.size() == 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
            }
            this.subscribeChannel.setReadable(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageHandler(MessageHandler messageHandler) {
        if (logger.isDebugEnabled()) {
            logger.debug("Setting the messageHandler for topic: {}, subscriberId: {}", (Object)this.getOrigSubData().topic.toStringUtf8(), (Object)this.getOrigSubData().subscriberId.toStringUtf8());
        }
        SubscribeResponseHandler subscribeResponseHandler = this;
        synchronized (subscribeResponseHandler) {
            this.messageHandler = messageHandler;
            if (messageHandler != null && this.subscribeMsgQueue != null && this.subscribeMsgQueue.size() > 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Consuming " + this.subscribeMsgQueue.size() + " queued up messages for topic: " + this.origSubData.topic.toStringUtf8() + ", subscriberId: " + this.origSubData.subscriberId.toStringUtf8());
                }
                for (PubSubProtocol.Message message : this.subscribeMsgQueue) {
                    this.asyncMessageConsume(message);
                }
                this.subscribeMsgQueue.clear();
            }
        }
    }

    public MessageHandler getMessageHandler() {
        return this.messageHandler;
    }
}

