/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.AbstractResponseHandler;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(value="all")
public class HChannelHandler
extends SimpleChannelHandler {
    private static Logger logger = LoggerFactory.getLogger(HChannelHandler.class);
    private final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap<Long, PubSubData>();
    private volatile boolean channelClosedExplicitly = false;
    private final AbstractHChannelManager channelManager;
    private final ClientConfiguration cfg;
    private final Map<PubSubProtocol.OperationType, AbstractResponseHandler> handlers;
    private final SubscribeResponseHandler subHandler;

    public HChannelHandler(ClientConfiguration cfg, AbstractHChannelManager channelManager, Map<PubSubProtocol.OperationType, AbstractResponseHandler> handlers) {
        this.cfg = cfg;
        this.channelManager = channelManager;
        this.handlers = handlers;
        this.subHandler = (SubscribeResponseHandler)handlers.get(PubSubProtocol.OperationType.SUBSCRIBE);
    }

    public SubscribeResponseHandler getSubscribeResponseHandler() {
        return this.subHandler;
    }

    public void removeTxn(long txnId) {
        this.txn2PubSubData.remove(txnId);
    }

    public void addTxn(long txnId, PubSubData pubSubData) {
        this.txn2PubSubData.put(txnId, pubSubData);
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        PubSubProtocol.ResponseBody resp;
        if (!(e.getMessage() instanceof PubSubProtocol.PubSubResponse)) {
            ctx.sendUpstream((ChannelEvent)e);
            return;
        }
        PubSubProtocol.PubSubResponse response = (PubSubProtocol.PubSubResponse)e.getMessage();
        logger.debug("Response received from host: {}, response: {}.", VarArgs.va(NetUtils.getHostFromChannel(ctx.getChannel()), response));
        if (response.hasMessage()) {
            if (null == this.subHandler) {
                logger.error("Received message from a non-subscription channel : {}", (Object)response);
            } else {
                this.subHandler.handleSubscribeMessage(response);
            }
            return;
        }
        if (response.hasResponseBody() && (resp = response.getResponseBody()).hasSubscriptionEvent()) {
            if (null == this.subHandler) {
                logger.error("Received subscription event from a non-subscription channel : {}", (Object)response);
            } else {
                PubSubProtocol.SubscriptionEventResponse eventResp = resp.getSubscriptionEvent();
                logger.debug("Received subscription event {} for (topic:{}, subscriber:{}).", VarArgs.va(eventResp.getEvent(), response.getTopic(), response.getSubscriberId()));
                this.subHandler.handleSubscriptionEvent(response.getTopic(), response.getSubscriberId(), eventResp.getEvent());
            }
            return;
        }
        PubSubData pubSubData = (PubSubData)this.txn2PubSubData.remove(response.getTxnId());
        if (pubSubData == null) {
            logger.error("PubSub Data was not found for PubSubResponse: {}", (Object)response);
            return;
        }
        if (!response.getStatusCode().equals((Object)PubSubProtocol.StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
            InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
            this.channelManager.storeTopic2HostMapping(pubSubData.topic, host);
        }
        logger.debug("Handling a {} response: {}, pubSubData: {}, host: {}.", VarArgs.va(pubSubData.operationType, response, pubSubData, ctx.getChannel()));
        AbstractResponseHandler respHandler = this.handlers.get(pubSubData.operationType);
        if (null == respHandler) {
            logger.error("Response received from server is for an unhandled operation {}, txnId: {}.", VarArgs.va(pubSubData.operationType, response.getTxnId()));
            pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.UnexpectedConditionException("Can't find response handler for operation " + pubSubData.operationType));
            return;
        }
        respHandler.handleResponse(response, pubSubData, ctx.getChannel());
    }

    public void checkTimeoutRequests() {
        long curTime = System.currentTimeMillis();
        long timeoutInterval = this.cfg.getServerAckResponseTimeout();
        for (PubSubData pubSubData : this.txn2PubSubData.values()) {
            this.checkTimeoutRequest(pubSubData, curTime, timeoutInterval);
        }
    }

    private void checkTimeoutRequest(PubSubData pubSubData, long curTime, long timeoutInterval) {
        if (curTime > pubSubData.requestWriteTime + timeoutInterval) {
            logger.error("Current PubSubRequest has timed out for pubSubData: " + pubSubData);
            this.txn2PubSubData.remove(pubSubData.txnId);
            pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.UncertainStateException("Server ack response never received so PubSubRequest has timed out!"));
        }
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if (this.channelClosedExplicitly || this.channelManager.isClosed()) {
            return;
        }
        InetSocketAddress host = NetUtils.getHostFromChannel(ctx.getChannel());
        if (host == null) {
            return;
        }
        logger.info("Channel {} was disconnected to host {}.", VarArgs.va(ctx.getChannel(), host));
        if (null == this.subHandler) {
            this.channelManager.onNonSubscriptionChannelDisconnected(host, ctx.getChannel());
        } else {
            this.channelManager.onSubscriptionChannelDisconnected(host, ctx.getChannel());
        }
        for (PubSubData pubSubData : this.txn2PubSubData.values()) {
            logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: {}", (Object)pubSubData);
            pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.UncertainStateException("Server ack response never received before server connection disconnected!"));
        }
        this.txn2PubSubData.clear();
    }

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if (this.cfg.isSSLEnabled() && !this.channelClosedExplicitly && !this.channelManager.isClosed()) {
            logger.debug("Initiating the SSL handshake");
            ((SslHandler)ctx.getPipeline().get(SslHandler.class)).handshake(e.getChannel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        logger.error("Exception caught on client channel", e.getCause());
        e.getChannel().close();
    }

    public void closeExplicitly() {
        this.channelClosedExplicitly = true;
    }
}

