/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ActiveSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscribeResponseHandler
extends SubscribeResponseHandler {
    private static Logger logger = LoggerFactory.getLogger(AbstractSubscribeResponseHandler.class);
    protected final ReentrantReadWriteLock disconnectLock = new ReentrantReadWriteLock();
    protected final ConcurrentMap<TopicSubscriber, ActiveSubscriber> subscriptions = new ConcurrentHashMap<TopicSubscriber, ActiveSubscriber>();
    protected final AbstractHChannelManager aChannelManager;

    protected AbstractSubscribeResponseHandler(ClientConfiguration cfg, HChannelManager channelManager) {
        super(cfg, channelManager);
        this.aChannelManager = (AbstractHChannelManager)channelManager;
    }

    protected HChannelManager getHChannelManager() {
        return this.channelManager;
    }

    protected ClientConfiguration getConfiguration() {
        return this.cfg;
    }

    protected ActiveSubscriber getActiveSubscriber(TopicSubscriber ts) {
        return (ActiveSubscriber)this.subscriptions.get(ts);
    }

    protected ActiveSubscriber createActiveSubscriber(ClientConfiguration cfg, AbstractHChannelManager channelManager, TopicSubscriber ts, PubSubData op, PubSubProtocol.SubscriptionPreferences preferences, Channel channel, HChannel hChannel) {
        return new ActiveSubscriber(cfg, channelManager, ts, op, preferences, channel, hChannel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleResponse(PubSubProtocol.PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe response: {}, pubSubData: {}, host: {}.", VarArgs.va(response, pubSubData, NetUtils.getHostFromChannel(channel)));
        }
        switch (response.getStatusCode()) {
            case SUCCESS: {
                PubSubProtocol.StatusCode statusCode;
                PubSubProtocol.SubscribeResponse resp;
                PubSubProtocol.ResponseBody respBody;
                TopicSubscriber ts = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
                PubSubProtocol.SubscriptionPreferences preferences = null;
                if (response.hasResponseBody() && (respBody = response.getResponseBody()).hasSubscribeResponse() && (resp = respBody.getSubscribeResponse()).hasPreferences()) {
                    preferences = resp.getPreferences();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Receive subscription preferences for {} : {}", VarArgs.va(ts, SubscriptionStateUtils.toString((PubSubProtocol.SubscriptionPreferences)preferences)));
                    }
                }
                ActiveSubscriber ss = null;
                this.disconnectLock.readLock().lock();
                try {
                    Either<PubSubProtocol.StatusCode, HChannel> result = this.handleSuccessResponse(ts, pubSubData, channel);
                    statusCode = result.left();
                    if (PubSubProtocol.StatusCode.SUCCESS == statusCode) {
                        ss = this.createActiveSubscriber(this.cfg, this.aChannelManager, ts, pubSubData, preferences, channel, result.right());
                        statusCode = this.addSubscription(ts, ss);
                    }
                }
                finally {
                    this.disconnectLock.readLock().unlock();
                }
                if (PubSubProtocol.StatusCode.SUCCESS == statusCode) {
                    this.postHandleSuccessResponse(ts, ss);
                    pubSubData.getCallback().operationFinished(pubSubData.context, null);
                    break;
                }
                PubSubException exception = PubSubException.create((PubSubProtocol.StatusCode)statusCode, (String)("Client is already subscribed for " + ts));
                pubSubData.getCallback().operationFailed(pubSubData.context, exception);
                break;
            }
            case CLIENT_ALREADY_SUBSCRIBED: {
                pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.ClientAlreadySubscribedException("Client is already subscribed for topic: " + pubSubData.topic.toStringUtf8() + ", subscriberId: " + pubSubData.subscriberId.toStringUtf8()));
                break;
            }
            case SERVICE_DOWN: {
                pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException("Server responded with a SERVICE_DOWN status"));
                break;
            }
            case NOT_RESPONSIBLE_FOR_TOPIC: {
                this.handleRedirectResponse(response, pubSubData, channel);
                break;
            }
            default: {
                logger.error("Unexpected error response from server for PubSubResponse: " + response);
                pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException("Server responded with a status code of: " + response.getStatusCode(), (Throwable)PubSubException.create((PubSubProtocol.StatusCode)response.getStatusCode(), (String)"Original Exception")));
            }
        }
    }

    protected abstract Either<PubSubProtocol.StatusCode, HChannel> handleSuccessResponse(TopicSubscriber var1, PubSubData var2, Channel var3);

    protected void postHandleSuccessResponse(TopicSubscriber ts, ActiveSubscriber ss) {
    }

    private PubSubProtocol.StatusCode addSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
        ActiveSubscriber oldSS = this.subscriptions.putIfAbsent(ts, ss);
        if (null != oldSS) {
            return PubSubProtocol.StatusCode.CLIENT_ALREADY_SUBSCRIBED;
        }
        return PubSubProtocol.StatusCode.SUCCESS;
    }

    @Override
    public void handleSubscribeMessage(PubSubProtocol.PubSubResponse response) {
        ActiveSubscriber ss;
        PubSubProtocol.Message message = response.getMessage();
        TopicSubscriber ts = new TopicSubscriber(response.getTopic(), response.getSubscriberId());
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a Subscribe message in response: {}, {}", VarArgs.va(response, ts));
        }
        if (null == (ss = this.getActiveSubscriber(ts))) {
            logger.error("Subscriber {} is not found receiving its message {}.", VarArgs.va(ts, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId())));
            return;
        }
        ss.handleMessage(message);
    }

    @Override
    protected void asyncMessageDeliver(TopicSubscriber topicSubscriber, PubSubProtocol.Message message) {
        ActiveSubscriber ss = this.getActiveSubscriber(topicSubscriber);
        if (null == ss) {
            logger.error("Subscriber {} is not found delivering its message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId())));
            return;
        }
        ss.asyncMessageDeliver(message);
    }

    @Override
    protected void messageConsumed(TopicSubscriber topicSubscriber, PubSubProtocol.Message message) {
        ActiveSubscriber ss = this.getActiveSubscriber(topicSubscriber);
        if (null == ss) {
            logger.warn("Subscriber {} is not found consumed its message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)message.getMsgId())));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Message has been successfully consumed by the client app : {}, {}", VarArgs.va(message, topicSubscriber));
        }
        ss.messageConsumed(message);
    }

    @Override
    public void handleSubscriptionEvent(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionEvent event) {
        TopicSubscriber ts = new TopicSubscriber(topic, subscriberId);
        ActiveSubscriber ss = this.getActiveSubscriber(ts);
        if (null == ss) {
            logger.warn("No subscription {} found receiving subscription event {}.", VarArgs.va(ts, event));
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Received subscription event {} for ({}).", VarArgs.va(event, ts));
        }
        this.processSubscriptionEvent(ss, event);
    }

    protected void processSubscriptionEvent(ActiveSubscriber as, PubSubProtocol.SubscriptionEvent event) {
        switch (event) {
            case TOPIC_MOVED: 
            case SUBSCRIPTION_FORCED_CLOSED: {
                this.resubscribeIfNecessary(as, event);
                break;
            }
            default: {
                logger.error("Receive unknown subscription event {} for {}.", VarArgs.va(event, as.getTopicSubscriber()));
            }
        }
    }

    @Override
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        ActiveSubscriber ss = this.getActiveSubscriber(topicSubscriber);
        if (null == ss) {
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Start delivering message for {} using message handler {}", VarArgs.va(topicSubscriber, messageHandler));
        }
        ss.startDelivery(messageHandler);
    }

    @Override
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        ActiveSubscriber ss = this.getActiveSubscriber(topicSubscriber);
        if (null == ss) {
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Stop delivering messages for {}", (Object)topicSubscriber);
        }
        ss.stopDelivery();
    }

    @Override
    public boolean hasSubscription(TopicSubscriber topicSubscriber) {
        return this.subscriptions.containsKey(topicSubscriber);
    }

    @Override
    public void consume(TopicSubscriber topicSubscriber, PubSubProtocol.MessageSeqId messageSeqId) {
        ActiveSubscriber ss = this.getActiveSubscriber(topicSubscriber);
        if (null == ss) {
            logger.warn("Subscriber {} is not found consuming message {}.", VarArgs.va(topicSubscriber, MessageIdUtils.msgIdToReadableString((PubSubProtocol.MessageSeqId)messageSeqId)));
            return;
        }
        ss.consume(messageSeqId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onChannelDisconnected(InetSocketAddress host, Channel channel) {
        this.disconnectLock.writeLock().lock();
        try {
            this.onDisconnect(host);
        }
        finally {
            this.disconnectLock.writeLock().unlock();
        }
    }

    private void onDisconnect(InetSocketAddress host) {
        for (ActiveSubscriber ss : this.subscriptions.values()) {
            this.onDisconnect(ss, host);
        }
    }

    private void onDisconnect(ActiveSubscriber ss, InetSocketAddress host) {
        logger.info("Subscription channel for ({}) is disconnected.", (Object)ss);
        this.resubscribeIfNecessary(ss, PubSubProtocol.SubscriptionEvent.TOPIC_MOVED);
    }

    protected boolean removeSubscription(TopicSubscriber ts, ActiveSubscriber ss) {
        return this.subscriptions.remove(ts, ss);
    }

    protected void resubscribeIfNecessary(ActiveSubscriber ss, PubSubProtocol.SubscriptionEvent event) {
        if (!this.removeSubscription(ss.getTopicSubscriber(), ss)) {
            return;
        }
        ss.resubscribeIfNecessary(event);
    }
}

