/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl.simple;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelHandler;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.client.netty.impl.simple.SimpleSubscriptionChannelPipelineFactory;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleHChannelManager
extends AbstractHChannelManager {
    private static Logger logger = LoggerFactory.getLogger(SimpleHChannelManager.class);
    protected final CleanupChannelMap<TopicSubscriber> topicSubscriber2Channel;
    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;

    public SimpleHChannelManager(ClientConfiguration cfg, ChannelFactory socketFactory) {
        super(cfg, socketFactory);
        this.topicSubscriber2Channel = new CleanupChannelMap();
        this.subscriptionChannelPipelineFactory = new SimpleSubscriptionChannelPipelineFactory(cfg, this);
    }

    @Override
    public void submitOp(final PubSubData pubSubData) {
        if (PubSubProtocol.OperationType.SUBSCRIBE.equals((Object)pubSubData.operationType)) {
            final Callback<PubSubProtocol.ResponseBody> origCb = pubSubData.getCallback();
            final AtomicInteger retries = new AtomicInteger(5);
            Callback<PubSubProtocol.ResponseBody> wrapperCb = new Callback<PubSubProtocol.ResponseBody>(){

                @Override
                public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
                    origCb.operationFinished(ctx, resultOfOperation);
                }

                @Override
                public void operationFailed(Object ctx, PubSubException exception) {
                    if (exception instanceof PubSubException.ServiceDownException && exception.getCause() instanceof PubSubException.TopicBusyException && retries.decrementAndGet() > 0) {
                        logger.warn("TOPIC_DOWN from server using simple channel scheme.This could be due to the channel disconnection from a close not having been triggered on the server side. Retrying");
                        SimpleHChannelManager.super.submitOp(pubSubData);
                        return;
                    }
                    origCb.operationFailed(ctx, exception);
                }
            };
            pubSubData.setCallback(wrapperCb);
        }
        super.submitOp(pubSubData);
    }

    @Override
    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
        return this.subscriptionChannelPipelineFactory;
    }

    @Override
    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
        return new HChannelImpl(host, channel, this, this.getSubscriptionChannelPipelineFactory());
    }

    @Override
    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
        return new HChannelImpl(host, this, this.getSubscriptionChannelPipelineFactory());
    }

    protected Either<Boolean, HChannel> storeSubscriptionChannel(TopicSubscriber topicSubscriber, PubSubData txn, Channel channel) {
        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
        HChannelImpl newHChannel = new HChannelImpl(host, channel, this, this.getSubscriptionChannelPipelineFactory());
        boolean replaced = this.topicSubscriber2Channel.replaceChannel(topicSubscriber, txn.getOriginalChannelForResubscribe(), newHChannel);
        if (replaced) {
            return Either.of(replaced, newHChannel);
        }
        return Either.of(replaced, null);
    }

    @Override
    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
        return null;
    }

    @Override
    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
        HChannel channel = this.topicSubscriber2Channel.getChannel(subscriber);
        if (null != channel) {
            return channel;
        }
        InetSocketAddress host = (InetSocketAddress)this.topic2Host.get(subscriber.getTopic());
        if (null == host) {
            return null;
        }
        channel = this.getSubscriptionChannel(host);
        if (null == channel) {
            channel = this.createAndStoreSubscriptionChannel(host);
        }
        return channel;
    }

    @Override
    protected void onSubscriptionChannelDisconnected(InetSocketAddress host, Channel channel) {
        logger.info("Subscription Channel {} disconnected from {}.", VarArgs.va(channel, host));
        try {
            HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel);
            channelHandler.getSubscribeResponseHandler().onChannelDisconnected(host, channel);
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No Channel Handler found for channel {} when it disconnected.", (Object)channel);
        }
    }

    @Override
    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
        HChannel hChannel = this.topicSubscriber2Channel.getChannel(topicSubscriber);
        if (null == hChannel) {
            return null;
        }
        Channel channel = hChannel.getChannel();
        if (null == channel) {
            return null;
        }
        try {
            HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel);
            return channelHandler.getSubscribeResponseHandler();
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", (Object)channel, (Object)topicSubscriber);
            return null;
        }
    }

    @Override
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topicSubscriber, messageHandler, false);
    }

    @Override
    protected void restartDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topicSubscriber, null, true);
    }

    private void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler, boolean restart) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        SubscribeResponseHandler subscribeResponseHandler = this.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", (Object)topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        MessageHandler existedMsgHandler = (MessageHandler)this.topicSubscriber2MessageHandler.get(topicSubscriber);
        if (restart) {
            messageHandler = existedMsgHandler;
        } else {
            if (null != existedMsgHandler) {
                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
            }
            if (messageHandler != null && null != this.topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
                throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
            }
        }
        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
    }

    @Override
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        SubscribeResponseHandler subscribeResponseHandler = this.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", (Object)topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
        subscribeResponseHandler.stopDelivery(topicSubscriber);
    }

    @Override
    public void asyncCloseSubscription(final TopicSubscriber topicSubscriber, final Callback<PubSubProtocol.ResponseBody> callback, final Object context) {
        HChannel hChannel = this.topicSubscriber2Channel.removeChannel(topicSubscriber);
        if (null == hChannel) {
            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for {}", (Object)topicSubscriber);
            callback.operationFinished(context, null);
            return;
        }
        Channel channel = hChannel.getChannel();
        if (null == channel) {
            callback.operationFinished(context, null);
            return;
        }
        try {
            HChannelImpl.getHChannelHandlerFromChannel(channel).closeExplicitly();
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No Channel Handler found when closing {}'s channel {}.", (Object)channel, (Object)topicSubscriber);
        }
        ChannelFuture future = channel.close();
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Failed to close the subscription channel for {}", (Object)topicSubscriber);
                    callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException("Failed to close the subscription channel for " + topicSubscriber));
                } else {
                    callback.operationFinished(context, null);
                }
            }
        });
    }

    @Override
    protected void checkTimeoutRequestsOnSubscriptionChannels() {
        if (null == this.topicSubscriber2Channel) {
            return;
        }
        for (HChannel channel : this.topicSubscriber2Channel.getChannels()) {
            try {
                HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
                channelHandler.checkTimeoutRequests();
            }
            catch (NoResponseHandlerException nrhe) {}
        }
    }

    @Override
    protected void closeSubscriptionChannels() {
        this.topicSubscriber2Channel.close();
    }
}

