package org.apache.hw_v4_0_0.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration;
import org.apache.hw_v4_0_0.hedwig.client.data.PubSubData;
import org.apache.hw_v4_0_0.hedwig.client.exceptions.ServerRedirectLoopException;
import org.apache.hw_v4_0_0.hedwig.client.exceptions.TooManyServerRedirectsException;
import org.apache.hw_v4_0_0.hedwig.client.handlers.PublishResponseHandler;
import org.apache.hw_v4_0_0.hedwig.client.handlers.SubscribeReconnectCallback;
import org.apache.hw_v4_0_0.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hw_v4_0_0.hedwig.client.handlers.UnsubscribeResponseHandler;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress;
import org.jboss.hw_v4_0_0.netty.channel.Channel;
import org.jboss.hw_v4_0_0.netty.channel.ChannelHandlerContext;
import org.jboss.hw_v4_0_0.netty.channel.ChannelPipelineCoverage;
import org.jboss.hw_v4_0_0.netty.channel.ChannelStateEvent;
import org.jboss.hw_v4_0_0.netty.channel.ExceptionEvent;
import org.jboss.hw_v4_0_0.netty.channel.MessageEvent;
import org.jboss.hw_v4_0_0.netty.channel.SimpleChannelHandler;
import org.jboss.hw_v4_0_0.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(ChannelPipelineCoverage.ALL)
/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/client/netty/ResponseHandler.class */
public class ResponseHandler extends SimpleChannelHandler {
    private static Logger logger = LoggerFactory.getLogger(ResponseHandler.class);
    private final HedwigClientImpl client;
    private final HedwigPublisher pub;
    private final HedwigSubscriber sub;
    private final ClientConfiguration cfg;
    protected final ConcurrentMap<Long, PubSubData> txn2PubSubData = new ConcurrentHashMap();
    public boolean channelClosedExplicitly = false;
    private final PublishResponseHandler pubHandler = new PublishResponseHandler(this);
    private final SubscribeResponseHandler subHandler = new SubscribeResponseHandler(this);
    private final UnsubscribeResponseHandler unsubHandler = new UnsubscribeResponseHandler(this);

    public ResponseHandler(HedwigClientImpl hedwigClientImpl) {
        this.client = hedwigClientImpl;
        this.sub = hedwigClientImpl.getSubscriber();
        this.pub = hedwigClientImpl.getPublisher();
        this.cfg = hedwigClientImpl.getConfiguration();
    }

    public HedwigClientImpl getClient() {
        return this.client;
    }

    public HedwigSubscriber getSubscriber() {
        return this.sub;
    }

    public ClientConfiguration getConfiguration() {
        return this.cfg;
    }

    public SubscribeResponseHandler getSubscribeResponseHandler() {
        return this.subHandler;
    }

    @Override // org.jboss.hw_v4_0_0.netty.channel.SimpleChannelHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (!(messageEvent.getMessage() instanceof PubSubProtocol.PubSubResponse)) {
            channelHandlerContext.sendUpstream(messageEvent);
        }
        PubSubProtocol.PubSubResponse pubSubResponse = (PubSubProtocol.PubSubResponse) messageEvent.getMessage();
        if (logger.isDebugEnabled()) {
            logger.debug("Response received from host: " + HedwigClientImpl.getHostFromChannel(channelHandlerContext.getChannel()) + ", response: " + pubSubResponse);
        }
        if (pubSubResponse.hasMessage()) {
            this.subHandler.handleSubscribeMessage(pubSubResponse);
            return;
        }
        PubSubData pubSubData = this.txn2PubSubData.containsKey(Long.valueOf(pubSubResponse.getTxnId())) ? this.txn2PubSubData.get(Long.valueOf(pubSubResponse.getTxnId())) : null;
        if (pubSubData == null) {
            logger.error("PubSub Data was not found for PubSubResponse: " + pubSubResponse);
            return;
        }
        this.txn2PubSubData.remove(Long.valueOf(pubSubResponse.getTxnId()));
        if (!pubSubResponse.getStatusCode().equals(PubSubProtocol.StatusCode.NOT_RESPONSIBLE_FOR_TOPIC)) {
            this.client.storeTopic2HostMapping(pubSubData, channelHandlerContext.getChannel());
        }
        switch (pubSubData.operationType) {
            case PUBLISH:
                this.pubHandler.handlePublishResponse(pubSubResponse, pubSubData, channelHandlerContext.getChannel());
                return;
            case SUBSCRIBE:
                this.subHandler.handleSubscribeResponse(pubSubResponse, pubSubData, channelHandlerContext.getChannel());
                return;
            case UNSUBSCRIBE:
                this.unsubHandler.handleUnsubscribeResponse(pubSubResponse, pubSubData, channelHandlerContext.getChannel());
                return;
            default:
                logger.error("Response received from server is for an unhandled operation type, txnId: " + pubSubResponse.getTxnId() + ", operationType: " + pubSubData.operationType);
                return;
        }
    }

    public void handleRedirectResponse(PubSubProtocol.PubSubResponse pubSubResponse, PubSubData pubSubData, Channel channel) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling a redirect from host: " + HedwigClientImpl.getHostFromChannel(channel) + ", response: " + pubSubResponse + ", pubSubData: " + pubSubData);
        }
        int size = pubSubData.triedServers == null ? 0 : pubSubData.triedServers.size();
        if (size >= this.cfg.getMaximumServerRedirects()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Exceeded the number of server redirects (" + size + ") so error out.");
            }
            pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.ServiceDownException(new TooManyServerRedirectsException("Already reached max number of redirects: " + size)));
            return;
        }
        ByteString copyFromUtf8 = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(HedwigClientImpl.getHostFromChannel(channel)));
        if (pubSubData.triedServers == null) {
            pubSubData.triedServers = new LinkedList();
        }
        pubSubData.shouldClaim = true;
        pubSubData.triedServers.add(copyFromUtf8);
        String statusMsg = pubSubResponse.getStatusMsg();
        InetSocketAddress defaultServerHost = (statusMsg == null || statusMsg.length() <= 0) ? this.cfg.getDefaultServerHost() : this.cfg.isSSLEnabled() ? new HedwigSocketAddress(statusMsg).getSSLSocketAddress() : new HedwigSocketAddress(statusMsg).getSocketAddress();
        if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(defaultServerHost)))) {
            logger.error("We've already sent this PubSubRequest before to redirectedHost: " + defaultServerHost + ", pubSubData: " + pubSubData);
            pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.ServiceDownException(new ServerRedirectLoopException("Already made the request before to redirected host: " + defaultServerHost)));
            return;
        }
        boolean z = this.pub.host2Channel.containsKey(defaultServerHost);
        if (pubSubData.operationType.equals(PubSubProtocol.OperationType.SUBSCRIBE) || !z) {
            this.client.doConnect(pubSubData, defaultServerHost);
        } else if (pubSubData.operationType.equals(PubSubProtocol.OperationType.PUBLISH)) {
            this.pub.doPublish(pubSubData, this.pub.host2Channel.get(defaultServerHost));
        } else if (pubSubData.operationType.equals(PubSubProtocol.OperationType.UNSUBSCRIBE)) {
            this.sub.doSubUnsub(pubSubData, this.pub.host2Channel.get(defaultServerHost));
        }
    }

    @Override // org.jboss.hw_v4_0_0.netty.channel.SimpleChannelHandler
    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (this.channelClosedExplicitly || this.client.hasStopped()) {
            return;
        }
        InetSocketAddress hostFromChannel = HedwigClientImpl.getHostFromChannel(channelHandlerContext.getChannel());
        logger.warn("Channel was disconnected to host: " + hostFromChannel);
        if (hostFromChannel == null) {
            return;
        }
        PubSubData origSubData = this.subHandler.getOrigSubData();
        if (origSubData != null) {
            this.sub.closeSubscription(origSubData.topic, origSubData.subscriberId);
            this.client.clearAllTopicsForHost(hostFromChannel);
            origSubData.clearServersList();
            origSubData.callback = new SubscribeReconnectCallback(origSubData, this.client, this.subHandler.getMessageHandler());
            origSubData.context = null;
            if (logger.isDebugEnabled()) {
                logger.debug("Disconnected subscribe channel so reconnect with origSubData: " + origSubData);
            }
            this.client.doConnect(origSubData, this.cfg.getDefaultServerHost());
        } else if (this.pub.host2Channel.containsKey(hostFromChannel) && this.pub.host2Channel.get(hostFromChannel).equals(channelHandlerContext.getChannel())) {
            if (logger.isDebugEnabled()) {
                logger.debug("Disconnected channel for host: " + hostFromChannel + " was for Publish/Unsubscribe requests so remove all references to it.");
            }
            this.pub.host2Channel.remove(hostFromChannel);
            this.client.clearAllTopicsForHost(hostFromChannel);
        }
        for (PubSubData pubSubData : this.txn2PubSubData.values()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Channel disconnected so invoking the operationFailed callback for pubSubData: " + pubSubData);
            }
            pubSubData.callback.operationFailed(pubSubData.context, new PubSubException.UncertainStateException("Server ack response never received before server connection disconnected!"));
        }
        this.txn2PubSubData.clear();
    }

    @Override // org.jboss.hw_v4_0_0.netty.channel.SimpleChannelHandler
    public void channelConnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        if (!this.cfg.isSSLEnabled() || this.channelClosedExplicitly || this.client.hasStopped()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Initiating the SSL handshake");
        }
        ((SslHandler) channelHandlerContext.getPipeline().get(SslHandler.class)).handshake(channelStateEvent.getChannel());
    }

    @Override // org.jboss.hw_v4_0_0.netty.channel.SimpleChannelHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        logger.error("Exception caught on client channel", exceptionEvent.getCause());
        exceptionEvent.getChannel().close();
    }
}
