/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
import org.apache.hedwig.client.handlers.PublishResponseHandler;
import org.apache.hedwig.client.handlers.SubscribeReconnectCallback;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.handlers.UnsubscribeResponseHandler;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.HedwigPublisher;
import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(value="all")
public class ResponseHandler
extends SimpleChannelHandler {
    private static Logger logger = LoggerFactory.getLogger(ResponseHandler.class);
    protected final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap<Long, PubSubData>();
    public boolean channelClosedExplicitly = false;
    private final HedwigClientImpl client;
    private final HedwigPublisher pub;
    private final HedwigSubscriber sub;
    private final ClientConfiguration cfg;
    private final PublishResponseHandler pubHandler;
    private final SubscribeResponseHandler subHandler;
    private final UnsubscribeResponseHandler unsubHandler;

    public ResponseHandler(HedwigClientImpl client) {
        this.client = client;
        this.sub = client.getSubscriber();
        this.pub = client.getPublisher();
        this.cfg = client.getConfiguration();
        this.pubHandler = new PublishResponseHandler(this);
        this.subHandler = new SubscribeResponseHandler(this);
        this.unsubHandler = new UnsubscribeResponseHandler(this);
    }

    public HedwigClientImpl getClient() {
        return this.client;
    }

    public HedwigSubscriber getSubscriber() {
        return this.sub;
    }

    public ClientConfiguration getConfiguration() {
        return this.cfg;
    }

    public SubscribeResponseHandler getSubscribeResponseHandler() {
        return this.subHandler;
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        PubSubData pubSubData;
        if (!(e.getMessage() instanceof PubSubProtocol.PubSubResponse)) {
            ctx.sendUpstream((ChannelEvent)e);
        }
        PubSubProtocol.PubSubResponse response = (PubSubProtocol.PubSubResponse)e.getMessage();
        if (logger.isDebugEnabled()) {
            logger.debug("Response received from host: " + HedwigClientImpl.getHostFromChannel(ctx.getChannel()) + ", response: " + response);
        }
        if (response.hasMessage()) {
            this.subHandler.handleSubscribeMessage(response);
            return;
        }
        PubSubData pubSubData2 = pubSubData = this.txn2PubSubData.containsKey(response.getTxnId()) ? (PubSubData)this.txn2PubSubData.get(response.getTxnId()) : null;
        if (pubSubData == null) {
            logger.error("PubSub Data was not found for PubSubResponse: " + response);
            return;
        }
        this.txn2PubSubData.remove(response.getTxnId());
        if (!response.getStatusCode().equals((Object)PubSubProtocol.StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
            this.client.storeTopic2HostMapping(pubSubData, ctx.getChannel());
        }
        switch (pubSubData.operationType) {
            case PUBLISH: {
                this.pubHandler.handlePublishResponse(response, pubSubData, ctx.getChannel());
                break;
            }
            case SUBSCRIBE: {
                this.subHandler.handleSubscribeResponse(response, pubSubData, ctx.getChannel());
                break;
            }
            case UNSUBSCRIBE: {
                this.unsubHandler.handleUnsubscribeResponse(response, pubSubData, ctx.getChannel());
                break;
            }
            default: {
                logger.error("Response received from server is for an unhandled operation type, txnId: " + response.getTxnId() + ", operationType: " + pubSubData.operationType);
            }
        }
    }

    public void handleRedirectResponse(PubSubProtocol.PubSubResponse response, PubSubData pubSubData, Channel channel) throws Exception {
        boolean redirectedHostChannelExists;
        int curNumServerRedirects;
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a redirect from host: " + HedwigClientImpl.getHostFromChannel(channel) + ", response: " + response + ", pubSubData: " + pubSubData);
        }
        int n = curNumServerRedirects = pubSubData.triedServers == null ? 0 : pubSubData.triedServers.size();
        if (curNumServerRedirects >= this.cfg.getMaximumServerRedirects()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Exceeded the number of server redirects (" + curNumServerRedirects + ") so error out.");
            }
            pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException((Exception)new TooManyServerRedirectsException("Already reached max number of redirects: " + curNumServerRedirects)));
            return;
        }
        ByteString triedServer = ByteString.copyFromUtf8((String)HedwigSocketAddress.sockAddrStr(HedwigClientImpl.getHostFromChannel(channel)));
        if (pubSubData.triedServers == null) {
            pubSubData.triedServers = new LinkedList<ByteString>();
        }
        pubSubData.shouldClaim = true;
        pubSubData.triedServers.add(triedServer);
        String statusMsg = response.getStatusMsg();
        InetSocketAddress redirectedHost = statusMsg != null && statusMsg.length() > 0 ? (this.cfg.isSSLEnabled() ? new HedwigSocketAddress(statusMsg).getSSLSocketAddress() : new HedwigSocketAddress(statusMsg).getSocketAddress()) : this.cfg.getDefaultServerHost();
        if (pubSubData.triedServers.contains(ByteString.copyFromUtf8((String)HedwigSocketAddress.sockAddrStr(redirectedHost)))) {
            logger.error("We've already sent this PubSubRequest before to redirectedHost: " + redirectedHost + ", pubSubData: " + pubSubData);
            pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException((Exception)new ServerRedirectLoopException("Already made the request before to redirected host: " + redirectedHost)));
            return;
        }
        boolean bl = redirectedHostChannelExists = this.pub.host2Channel.containsKey(redirectedHost);
        if (pubSubData.operationType.equals((Object)PubSubProtocol.OperationType.SUBSCRIBE) || !redirectedHostChannelExists) {
            this.client.doConnect(pubSubData, redirectedHost);
        } else if (pubSubData.operationType.equals((Object)PubSubProtocol.OperationType.PUBLISH)) {
            this.pub.doPublish(pubSubData, (Channel)this.pub.host2Channel.get(redirectedHost));
        } else if (pubSubData.operationType.equals((Object)PubSubProtocol.OperationType.UNSUBSCRIBE)) {
            this.sub.doSubUnsub(pubSubData, (Channel)this.pub.host2Channel.get(redirectedHost));
        }
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if (this.channelClosedExplicitly || this.client.hasStopped()) {
            return;
        }
        InetSocketAddress host = HedwigClientImpl.getHostFromChannel(ctx.getChannel());
        logger.warn("Channel was disconnected to host: " + host);
        if (host == null) {
            return;
        }
        PubSubData origSubData = this.subHandler.getOrigSubData();
        if (origSubData == null) {
            if (this.pub.host2Channel.containsKey(host) && ((Channel)this.pub.host2Channel.get(host)).equals(ctx.getChannel())) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Disconnected channel for host: " + host + " was for Publish/Unsubscribe requests so remove all references to it.");
                }
                this.pub.host2Channel.remove(host);
                this.client.clearAllTopicsForHost(host);
            }
        } else {
            this.sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
            this.client.clearAllTopicsForHost(host);
            origSubData.clearServersList();
            origSubData.callback = new SubscribeReconnectCallback(origSubData, this.client);
            origSubData.context = null;
            if (logger.isDebugEnabled()) {
                logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);
            }
            this.client.doConnect(origSubData, this.cfg.getDefaultServerHost());
        }
        for (PubSubData pubSubData : this.txn2PubSubData.values()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: " + pubSubData);
            }
            pubSubData.callback.operationFailed(pubSubData.context, (PubSubException)new PubSubException.UncertainStateException("Server ack response never received before server connection disconnected!"));
        }
        this.txn2PubSubData.clear();
    }

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if (this.cfg.isSSLEnabled() && !this.channelClosedExplicitly && !this.client.hasStopped()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initiating the SSL handshake");
            }
            ((SslHandler)ctx.getPipeline().get(SslHandler.class)).handshake(e.getChannel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        logger.error("Exception caught on client channel", e.getCause());
        e.getChannel().close();
    }
}

