/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl.multiplex;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelHandler;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.client.netty.impl.multiplex.MultiplexSubscriptionChannelPipelineFactory;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiplexHChannelManager
extends AbstractHChannelManager {
    static final Logger logger = LoggerFactory.getLogger(MultiplexHChannelManager.class);
    protected final CleanupChannelMap<InetSocketAddress> subscriptionChannels;
    protected final CleanupChannelMap<TopicSubscriber> sub2Channels;
    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
    private final ClientChannelPipelineFactory subscriptionChannelPipelineFactory;

    public MultiplexHChannelManager(ClientConfiguration cfg, ChannelFactory socketFactory) {
        super(cfg, socketFactory);
        this.subscriptionChannels = new CleanupChannelMap();
        this.sub2Channels = new CleanupChannelMap();
        this.subscriptionChannelPipelineFactory = new MultiplexSubscriptionChannelPipelineFactory(cfg, this);
    }

    @Override
    protected ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory() {
        return this.subscriptionChannelPipelineFactory;
    }

    @Override
    protected HChannel createAndStoreSubscriptionChannel(Channel channel) {
        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
        HChannelImpl newHChannel = new HChannelImpl(host, channel, this, this.getSubscriptionChannelPipelineFactory());
        return this.storeSubscriptionChannel(host, newHChannel);
    }

    @Override
    protected HChannel createAndStoreSubscriptionChannel(InetSocketAddress host) {
        HChannelImpl newHChannel = new HChannelImpl(host, this, this.getSubscriptionChannelPipelineFactory());
        return this.storeSubscriptionChannel(host, newHChannel);
    }

    private HChannel storeSubscriptionChannel(InetSocketAddress host, HChannel newHChannel) {
        return this.subscriptionChannels.addChannel(host, newHChannel);
    }

    @Override
    protected HChannel getSubscriptionChannel(InetSocketAddress host) {
        return this.subscriptionChannels.getChannel(host);
    }

    protected HChannel getSubscriptionChannel(TopicSubscriber subscriber) {
        InetSocketAddress host = (InetSocketAddress)this.topic2Host.get(subscriber.getTopic());
        if (null == host) {
            return null;
        }
        return this.getSubscriptionChannel(host);
    }

    @Override
    protected HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber subscriber) {
        InetSocketAddress host = (InetSocketAddress)this.topic2Host.get(subscriber.getTopic());
        if (null == host) {
            return null;
        }
        HChannel channel = this.getSubscriptionChannel(host);
        if (null == channel) {
            channel = this.createAndStoreSubscriptionChannel(host);
        }
        return channel;
    }

    @Override
    protected void onSubscriptionChannelDisconnected(InetSocketAddress host, Channel channel) {
        HChannel hChannel = this.subscriptionChannels.getChannel(host);
        if (null == hChannel) {
            return;
        }
        Channel underlyingChannel = hChannel.getChannel();
        if (null == underlyingChannel || !underlyingChannel.equals(channel)) {
            return;
        }
        logger.info("Subscription Channel {} disconnected from {}.", VarArgs.va(channel, host));
        if (this.subscriptionChannels.removeChannel(host, hChannel)) {
            try {
                HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel);
                channelHandler.getSubscribeResponseHandler().onChannelDisconnected(host, channel);
            }
            catch (NoResponseHandlerException nrhe) {
                logger.warn("No Channel Handler found for channel {} when it disconnected.", (Object)channel);
            }
        }
    }

    @Override
    public SubscribeResponseHandler getSubscribeResponseHandler(TopicSubscriber topicSubscriber) {
        HChannel hChannel = this.getSubscriptionChannel(topicSubscriber);
        if (null == hChannel) {
            return null;
        }
        Channel channel = hChannel.getChannel();
        if (null == channel) {
            return null;
        }
        try {
            HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel);
            return channelHandler.getSubscribeResponseHandler();
        }
        catch (NoResponseHandlerException nrhe) {
            logger.warn("No Channel Handler found for channel {}, topic subscriber {}.", (Object)channel, (Object)topicSubscriber);
            return null;
        }
    }

    @Override
    public void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topicSubscriber, messageHandler, false);
    }

    @Override
    protected void restartDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topicSubscriber, null, true);
    }

    private void startDelivery(TopicSubscriber topicSubscriber, MessageHandler messageHandler, boolean restart) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        SubscribeResponseHandler subscribeResponseHandler = this.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", (Object)topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        MessageHandler existedMsgHandler = (MessageHandler)this.topicSubscriber2MessageHandler.get(topicSubscriber);
        if (restart) {
            messageHandler = existedMsgHandler;
        } else {
            if (null != existedMsgHandler) {
                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
            }
            if (messageHandler != null && null != this.topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
                throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
            }
        }
        subscribeResponseHandler.startDelivery(topicSubscriber, messageHandler);
    }

    @Override
    public void stopDelivery(TopicSubscriber topicSubscriber) throws PubSubException.ClientNotSubscribedException {
        SubscribeResponseHandler subscribeResponseHandler = this.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.error("Client is not yet subscribed to {}.", (Object)topicSubscriber);
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to " + topicSubscriber);
        }
        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
        subscribeResponseHandler.stopDelivery(topicSubscriber);
    }

    @Override
    public void asyncCloseSubscription(TopicSubscriber topicSubscriber, Callback<PubSubProtocol.ResponseBody> callback, Object context) {
        SubscribeResponseHandler subscribeResponseHandler = this.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            logger.warn("Trying to close a subscription when we don't have a subscription channel cached for {}", (Object)topicSubscriber);
            callback.operationFinished(context, null);
            return;
        }
        subscribeResponseHandler.asyncCloseSubscription(topicSubscriber, callback, context);
    }

    @Override
    protected void checkTimeoutRequestsOnSubscriptionChannels() {
        if (null == this.subscriptionChannels) {
            return;
        }
        for (HChannel channel : this.subscriptionChannels.getChannels()) {
            try {
                HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
                channelHandler.checkTimeoutRequests();
            }
            catch (NoResponseHandlerException nrhe) {}
        }
    }

    @Override
    protected void closeSubscriptionChannels() {
        this.subscriptionChannels.close();
    }

    protected Either<Boolean, HChannel> storeSubscriptionChannel(TopicSubscriber topicSubscriber, PubSubData txn, HChannel channel) {
        boolean replaced = this.sub2Channels.replaceChannel(topicSubscriber, txn.getOriginalChannelForResubscribe(), channel);
        if (replaced) {
            return Either.of(replaced, channel);
        }
        return Either.of(replaced, null);
    }

    protected boolean removeSubscriptionChannel(TopicSubscriber topicSubscriber, HChannel channel) {
        return this.sub2Channels.removeChannel(topicSubscriber, channel);
    }
}

