/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl.simple;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.AbstractSubscribeResponseHandler;
import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleSubscribeResponseHandler
extends AbstractSubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(SimpleSubscribeResponseHandler.class);
    private volatile TopicSubscriber origTopicSubscriber;
    private volatile ActiveSubscriber origActiveSubscriber;
    private SimpleHChannelManager sChannelManager;

    protected SimpleSubscribeResponseHandler(ClientConfiguration cfg, HChannelManager channelManager) {
        super(cfg, channelManager);
        this.sChannelManager = (SimpleHChannelManager)channelManager;
    }

    @Override
    protected ActiveSubscriber createActiveSubscriber(ClientConfiguration cfg, AbstractHChannelManager channelManager, TopicSubscriber ts, PubSubData op, PubSubProtocol.SubscriptionPreferences preferences, Channel channel, HChannel hChannel) {
        return new SimpleActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
    }

    @Override
    protected synchronized ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
        if (null == this.origTopicSubscriber || !this.origTopicSubscriber.equals(ts)) {
            return null;
        }
        return this.origActiveSubscriber;
    }

    private synchronized ActiveSubscriber getActiveSubscriber() {
        return this.origActiveSubscriber;
    }

    @Override
    public synchronized boolean hasSubscription(TopicSubscriber ts) {
        if (null == this.origTopicSubscriber) {
            return false;
        }
        return this.origTopicSubscriber.equals(ts);
    }

    @Override
    protected synchronized boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
        if (null != this.origTopicSubscriber && !this.origTopicSubscriber.equals(ts)) {
            return false;
        }
        this.origTopicSubscriber = null;
        this.origActiveSubscriber = null;
        return super.removeSubscription(ts, ss);
    }

    @Override
    public void handleResponse(PubSubProtocol.PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
        if (!response.getStatusCode().equals((Object)PubSubProtocol.StatusCode.SUCCESS)) {
            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
            channel.close();
        }
        super.handleResponse(response, pubSubData, channel);
    }

    @Override
    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse response) {
        PubSubProtocol.Message message = response.getMessage();
        ActiveSubscriber ss = this.getActiveSubscriber();
        if (null == ss) {
            logger.error("No Subscriber is alive receiving its message {}.", (Object)MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId()));
            return;
        }
        ss.handleMessage(message);
    }

    @Override
    protected Either<PubSubProtocol.StatusCode, HChannel> handleSuccessResponse(TopicSubscriber ts, PubSubData pubSubData, Channel channel) {
        Either<Boolean, HChannel> result = this.sChannelManager.storeSubscriptionChannel(ts, pubSubData, channel);
        if (result.left().booleanValue()) {
            return Either.of(PubSubProtocol.StatusCode.SUCCESS, result.right());
        }
        PubSubProtocol.StatusCode code = pubSubData.isResubscribeRequest() ? PubSubProtocol.StatusCode.RESUBSCRIBE_EXCEPTION : PubSubProtocol.StatusCode.CLIENT_ALREADY_SUBSCRIBED;
        return Either.of(code, null);
    }

    @Override
    protected synchronized void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber as) {
        this.origTopicSubscriber = ts;
        this.origActiveSubscriber = as;
    }

    @Override
    public void asyncCloseSubscription(TopicSubscriber topicSubscriber, Callback<PubSubProtocol.ResponseBody> callback, Object context) {
        callback.operationFinished(context, null);
    }

    static class SimpleActiveSubscriber
    extends ActiveSubscriber {
        private final Set<PubSubProtocol.Message> outstandingMsgSet;

        public SimpleActiveSubscriber(ClientConfiguration cfg, AbstractHChannelManager channelManager, TopicSubscriber ts, PubSubData op, PubSubProtocol.SubscriptionPreferences preferences, Channel channel, HChannel hChannel) {
            super(cfg, channelManager, ts, op, preferences, channel, hChannel);
            this.outstandingMsgSet = Collections.newSetFromMap(new ConcurrentHashMap(cfg.getMaximumOutstandingMessages(), 1.0f));
        }

        @Override
        protected void unsafeDeliverMessage(PubSubProtocol.Message message) {
            this.outstandingMsgSet.add(message);
            if (this.outstandingMsgSet.size() >= this.cfg.getMaximumOutstandingMessages() && this.channel.isReadable()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Too many outstanding messages ({}) so throttling the subscribe netty Channel", (Object)this.outstandingMsgSet.size());
                }
                this.channel.setReadable(false);
            }
            super.unsafeDeliverMessage(message);
        }

        @Override
        public synchronized void messageConsumed(PubSubProtocol.Message message) {
            super.messageConsumed(message);
            this.outstandingMsgSet.remove(message);
            if (!this.channel.isReadable() && this.outstandingMsgSet.size() == 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for {}", (Object)this.topicSubscriber);
                }
                this.channel.setReadable(true);
            }
        }

        @Override
        public synchronized void startDelivery(MessageHandler messageHandler) throws AlreadyStartDeliveryException, PubSubException.ClientNotSubscribedException {
            super.startDelivery(messageHandler);
            ChannelFuture future = this.channel.setReadable(true);
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        logger.error("Unable to make subscriber Channel readable in startDelivery call for {}", (Object)SimpleActiveSubscriber.this.topicSubscriber);
                    }
                }
            });
        }

        @Override
        public synchronized void stopDelivery() {
            super.stopDelivery();
            ChannelFuture future = this.channel.setReadable(false);
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        logger.error("Unable to make subscriber Channel not readable in stopDelivery call for {}", (Object)SimpleActiveSubscriber.this.topicSubscriber);
                    }
                }
            });
        }
    }
}

