/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl;

import java.util.LinkedList;
import java.util.Queue;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.FilterableMessageHandler;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ResubscribeCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(ActiveSubscriber.class);
    protected final ClientConfiguration cfg;
    protected final AbstractHChannelManager channelManager;
    protected final TopicSubscriber topicSubscriber;
    protected final PubSubData op;
    protected final PubSubProtocol.SubscriptionPreferences preferences;
    protected final Channel channel;
    protected final HChannel hChannel;
    private int numConsumedMessagesInBuffer = 0;
    private PubSubProtocol.MessageSeqId lastMessageSeqId = null;
    private MessageHandler msgHandler = null;
    private final Queue<PubSubProtocol.Message> msgQueue = new LinkedList<PubSubProtocol.Message>();

    public ActiveSubscriber(ClientConfiguration cfg, AbstractHChannelManager channelManager, TopicSubscriber ts, PubSubData op, PubSubProtocol.SubscriptionPreferences preferences, Channel channel, HChannel hChannel) {
        this.cfg = cfg;
        this.channelManager = channelManager;
        this.topicSubscriber = ts;
        this.op = op;
        this.preferences = preferences;
        this.channel = channel;
        this.hChannel = hChannel;
    }

    public PubSubData getPubSubData() {
        return this.op;
    }

    public TopicSubscriber getTopicSubscriber() {
        return this.topicSubscriber;
    }

    public synchronized void startDelivery(MessageHandler messageHandler) throws AlreadyStartDeliveryException, PubSubException.ClientNotSubscribedException {
        FilterableMessageHandler filterMsgHandler;
        if (null != this.msgHandler) {
            throw new AlreadyStartDeliveryException("A message handler " + this.msgHandler + " has been started for " + this.topicSubscriber);
        }
        if (null != messageHandler && messageHandler instanceof FilterableMessageHandler && (filterMsgHandler = (FilterableMessageHandler)messageHandler).hasMessageFilter()) {
            if (null == this.preferences) {
                logger.warn("Start delivering messages with filter but no subscription preferences found. It might due to talking to an old version hub server.");
                messageHandler = filterMsgHandler.getMessageHandler();
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Start delivering messages with filter on {}, preferences: {}", VarArgs.va(this.topicSubscriber, SubscriptionStateUtils.toString((PubSubProtocol.SubscriptionPreferences)this.preferences)));
                }
                ClientMessageFilter msgFilter = filterMsgHandler.getMessageFilter();
                msgFilter.setSubscriptionPreferences(this.topicSubscriber.getTopic(), this.topicSubscriber.getSubscriberId(), this.preferences);
            }
        }
        this.msgHandler = messageHandler;
        if (null == this.msgHandler) {
            return;
        }
        if (this.msgQueue.size() > 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Consuming {} queued up messages for {}", VarArgs.va(this.msgQueue.size(), this.topicSubscriber));
            }
            for (PubSubProtocol.Message message : this.msgQueue) {
                this.asyncMessageDeliver(message);
            }
            this.msgQueue.clear();
        }
    }

    public synchronized void stopDelivery() {
        this.msgHandler = null;
    }

    public synchronized void handleMessage(PubSubProtocol.Message message) {
        if (null != this.msgHandler) {
            this.asyncMessageDeliver(message);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Message {} has arrived but no MessageHandler provided for {} yet so queueing up the message.", VarArgs.va(MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId()), this.topicSubscriber));
            }
            this.msgQueue.add(message);
        }
    }

    public synchronized void asyncMessageDeliver(PubSubProtocol.Message message) {
        if (null == this.msgHandler) {
            logger.error("No message handler found to deliver message {} to {}.", VarArgs.va(MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId()), this.topicSubscriber));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Call the client app's MessageHandler asynchronously to deliver the message {} to {}", VarArgs.va(message, this.topicSubscriber));
        }
        this.unsafeDeliverMessage(message);
    }

    protected void unsafeDeliverMessage(PubSubProtocol.Message message) {
        MessageConsumeData messageConsumeData = new MessageConsumeData(this.topicSubscriber, message);
        this.msgHandler.deliver(this.topicSubscriber.getTopic(), this.topicSubscriber.getSubscriberId(), message, this.channelManager.getConsumeCallback(), messageConsumeData);
    }

    private synchronized boolean updateLastMessageSeqId(PubSubProtocol.MessageSeqId seqId) {
        if (null != this.lastMessageSeqId && seqId.getLocalComponent() <= this.lastMessageSeqId.getLocalComponent()) {
            return false;
        }
        ++this.numConsumedMessagesInBuffer;
        this.lastMessageSeqId = seqId;
        if (this.numConsumedMessagesInBuffer >= this.cfg.getConsumedMessagesBufferSize()) {
            this.numConsumedMessagesInBuffer = 0;
            this.lastMessageSeqId = null;
            return true;
        }
        return false;
    }

    public void consume(final PubSubProtocol.MessageSeqId messageSeqId) {
        PubSubProtocol.PubSubRequest.Builder pubsubRequestBuilder = NetUtils.buildConsumeRequest(this.channelManager.nextTxnId(), this.topicSubscriber, messageSeqId);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a Consume request to channel: {} with messageSeqId: {} for {}", VarArgs.va(this.channel, messageSeqId, this.topicSubscriber));
        }
        ChannelFuture future = this.channel.write((Object)pubsubRequestBuilder.build());
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Error writing a Consume request to channel: {} with messageSeqId: {} for {}", VarArgs.va(ActiveSubscriber.this.channel, messageSeqId, ActiveSubscriber.this.topicSubscriber));
                }
            }
        });
    }

    public void messageConsumed(PubSubProtocol.Message message) {
        if (this.cfg.isAutoSendConsumeMessageEnabled() && this.updateLastMessageSeqId(message.getMsgId())) {
            if (logger.isDebugEnabled()) {
                logger.debug("Consume message {} when reaching consumed message buffer limit.", (Object)message.getMsgId());
            }
            this.consume(message.getMsgId());
        }
    }

    public void resubscribeIfNecessary(PubSubProtocol.SubscriptionEvent event) {
        if (PubSubProtocol.SubscriptionEvent.TOPIC_MOVED == event) {
            this.channelManager.clearHostForTopic(this.topicSubscriber.getTopic(), NetUtils.getHostFromChannel(this.channel));
        }
        if (!this.op.options.getEnableResubscribe()) {
            this.channelManager.getSubscriptionEventEmitter().emitSubscriptionEvent(this.topicSubscriber.getTopic(), this.topicSubscriber.getSubscriberId(), event);
            return;
        }
        this.op.clearServersList();
        long retryWaitTime = this.cfg.getSubscribeReconnectRetryWaitTime();
        ResubscribeCallback resubscribeCb = new ResubscribeCallback(this.topicSubscriber, this.op, this.channelManager, retryWaitTime);
        this.op.setCallback(resubscribeCb);
        this.op.context = null;
        this.op.setOriginalChannelForResubscribe(this.hChannel);
        if (logger.isDebugEnabled()) {
            logger.debug("Resubscribe {} with origSubData {}", VarArgs.va(this.topicSubscriber, this.op));
        }
        this.channelManager.submitOp(this.op);
    }
}

