/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty.impl;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.NoResponseHandlerException;
import org.apache.hedwig.client.handlers.MessageConsumeCallback;
import org.apache.hedwig.client.netty.CleanupChannelMap;
import org.apache.hedwig.client.netty.HChannel;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.NetUtils;
import org.apache.hedwig.client.netty.SubscriptionEventEmitter;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.DefaultServerChannel;
import org.apache.hedwig.client.netty.impl.HChannelHandler;
import org.apache.hedwig.client.netty.impl.HChannelImpl;
import org.apache.hedwig.client.netty.impl.NonSubscriptionChannelPipelineFactory;
import org.apache.hedwig.client.ssl.SslClientContextFactory;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHChannelManager
implements HChannelManager {
    private static Logger logger = LoggerFactory.getLogger(AbstractHChannelManager.class);
    private static final Set<ByteString> EMPTY_TOPIC_SET = new HashSet<ByteString>();
    protected boolean closed = false;
    protected final ReentrantReadWriteLock closedLock = new ReentrantReadWriteLock();
    protected final AtomicLong globalCounter = new AtomicLong();
    protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
    protected final ConcurrentMap<InetSocketAddress, Set<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, Set<ByteString>>();
    protected final CleanupChannelMap<InetSocketAddress> host2NonSubscriptionChannels = new CleanupChannelMap();
    private final ClientConfiguration cfg;
    protected final ChannelFactory socketFactory;
    private final ClientChannelPipelineFactory nonSubscriptionChannelPipelineFactory;
    private SslClientContextFactory sslFactory = null;
    private final HChannel defaultServerChannel;
    private final Timer clientTimer = new Timer(true);
    private final MessageConsumeCallback consumeCb;
    private final SubscriptionEventEmitter eventEmitter;

    protected AbstractHChannelManager(ClientConfiguration cfg, ChannelFactory socketFactory) {
        this.cfg = cfg;
        this.socketFactory = socketFactory;
        this.nonSubscriptionChannelPipelineFactory = new NonSubscriptionChannelPipelineFactory(cfg, this);
        this.defaultServerChannel = new DefaultServerChannel(cfg.getDefaultServerHost(), this);
        if (cfg.isSSLEnabled()) {
            this.sslFactory = new SslClientContextFactory(cfg);
        }
        this.consumeCb = new MessageConsumeCallback(cfg, this);
        this.eventEmitter = new SubscriptionEventEmitter();
        this.clientTimer.schedule((TimerTask)new PubSubRequestTimeoutTask(), 0L, cfg.getTimeoutThreadRunInterval());
    }

    @Override
    public SubscriptionEventEmitter getSubscriptionEventEmitter() {
        return this.eventEmitter;
    }

    public MessageConsumeCallback getConsumeCallback() {
        return this.consumeCb;
    }

    public SslClientContextFactory getSslFactory() {
        return this.sslFactory;
    }

    protected ChannelFactory getChannelFactory() {
        return this.socketFactory;
    }

    protected ClientChannelPipelineFactory getNonSubscriptionChannelPipelineFactory() {
        return this.nonSubscriptionChannelPipelineFactory;
    }

    protected abstract ClientChannelPipelineFactory getSubscriptionChannelPipelineFactory();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void schedule(TimerTask task, long delay) {
        this.closedLock.readLock().lock();
        try {
            if (this.closed) {
                logger.warn("Task {} is not scheduled due to the channel manager is closed.", (Object)task);
                return;
            }
            this.clientTimer.schedule(task, delay);
        }
        finally {
            this.closedLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void submitOpAfterDelay(final PubSubData pubSubData, final long delay) {
        this.closedLock.readLock().lock();
        try {
            if (this.closed) {
                pubSubData.getCallback().operationFailed(pubSubData.context, (PubSubException)new PubSubException.ServiceDownException("Client has been closed."));
                return;
            }
            this.clientTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    logger.debug("Submit request {} in {} ms later.", VarArgs.va(pubSubData, delay));
                    AbstractHChannelManager.this.submitOp(pubSubData);
                }
            }, delay);
        }
        finally {
            this.closedLock.readLock().unlock();
        }
    }

    @Override
    public void submitOp(PubSubData pubSubData) {
        HChannel hChannel;
        if (PubSubProtocol.OperationType.PUBLISH.equals((Object)pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals((Object)pubSubData.operationType)) {
            hChannel = this.getNonSubscriptionChannelByTopic(pubSubData.topic);
        } else {
            TopicSubscriber ts = new TopicSubscriber(pubSubData.topic, pubSubData.subscriberId);
            hChannel = this.getSubscriptionChannelByTopicSubscriber(ts);
        }
        if (null == hChannel) {
            hChannel = this.defaultServerChannel;
        }
        hChannel.submitOp(pubSubData);
    }

    @Override
    public void redirectToHost(PubSubData pubSubData, InetSocketAddress host) {
        HChannel hChannel;
        logger.debug("Submit operation {} to host {}.", VarArgs.va(pubSubData, host));
        if (PubSubProtocol.OperationType.PUBLISH.equals((Object)pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals((Object)pubSubData.operationType)) {
            hChannel = this.getNonSubscriptionChannel(host);
            if (null == hChannel) {
                hChannel = this.createAndStoreNonSubscriptionChannel(host);
            }
        } else {
            hChannel = this.getSubscriptionChannel(host);
            if (null == hChannel) {
                hChannel = this.createAndStoreSubscriptionChannel(host);
            }
        }
        if (null == hChannel) {
            hChannel = this.defaultServerChannel;
        }
        hChannel.submitOp(pubSubData);
    }

    void submitOpThruChannel(PubSubData pubSubData, Channel channel) {
        logger.debug("Submit operation {} to thru channel {}.", VarArgs.va(pubSubData, channel));
        HChannel hChannel = PubSubProtocol.OperationType.PUBLISH.equals((Object)pubSubData.operationType) || PubSubProtocol.OperationType.UNSUBSCRIBE.equals((Object)pubSubData.operationType) ? this.createAndStoreNonSubscriptionChannel(channel) : this.createAndStoreSubscriptionChannel(channel);
        hChannel.submitOp(pubSubData);
    }

    @Override
    public void submitOpToDefaultServer(PubSubData pubSubData) {
        logger.debug("Submit operation {} to default server {}.", VarArgs.va(pubSubData, this.defaultServerChannel));
        this.defaultServerChannel.submitOp(pubSubData);
    }

    private HChannel createAndStoreNonSubscriptionChannel(Channel channel) {
        InetSocketAddress host = NetUtils.getHostFromChannel(channel);
        HChannelImpl newHChannel = new HChannelImpl(host, channel, this, this.getNonSubscriptionChannelPipelineFactory());
        return this.storeNonSubscriptionChannel(host, newHChannel);
    }

    private HChannel createAndStoreNonSubscriptionChannel(InetSocketAddress host) {
        HChannelImpl newHChannel = new HChannelImpl(host, this, this.getNonSubscriptionChannelPipelineFactory());
        return this.storeNonSubscriptionChannel(host, newHChannel);
    }

    private HChannel storeNonSubscriptionChannel(InetSocketAddress host, HChannel newHChannel) {
        return this.host2NonSubscriptionChannels.addChannel(host, newHChannel);
    }

    private HChannel getNonSubscriptionChannel(InetSocketAddress host) {
        return this.host2NonSubscriptionChannels.getChannel(host);
    }

    private HChannel getNonSubscriptionChannelByTopic(ByteString topic) {
        InetSocketAddress host = (InetSocketAddress)this.topic2Host.get(topic);
        if (null == host) {
            return null;
        }
        HChannel channel = this.getNonSubscriptionChannel(host);
        if (null == channel) {
            channel = this.createAndStoreNonSubscriptionChannel(host);
        }
        return channel;
    }

    protected void onNonSubscriptionChannelDisconnected(InetSocketAddress host, Channel channel) {
        HChannel hChannel = this.host2NonSubscriptionChannels.getChannel(host);
        if (null == hChannel) {
            return;
        }
        Channel underlyingChannel = hChannel.getChannel();
        if (null == underlyingChannel || !underlyingChannel.equals(channel)) {
            return;
        }
        logger.info("NonSubscription Channel {} to {} disconnected.", VarArgs.va(channel, host));
        if (this.host2NonSubscriptionChannels.removeChannel(host, hChannel)) {
            this.clearAllTopicsForHost(host);
        }
    }

    protected abstract HChannel createAndStoreSubscriptionChannel(Channel var1);

    protected abstract HChannel createAndStoreSubscriptionChannel(InetSocketAddress var1);

    protected abstract HChannel getSubscriptionChannel(InetSocketAddress var1);

    protected abstract HChannel getSubscriptionChannelByTopicSubscriber(TopicSubscriber var1);

    protected abstract void onSubscriptionChannelDisconnected(InetSocketAddress var1, Channel var2);

    private void sendConsumeRequest(final TopicSubscriber topicSubscriber, final PubSubProtocol.MessageSeqId messageSeqId, final Channel channel) {
        PubSubProtocol.PubSubRequest.Builder pubsubRequestBuilder = NetUtils.buildConsumeRequest(this.nextTxnId(), topicSubscriber, messageSeqId);
        logger.debug("Writing a Consume request to host: {} with messageSeqId: {} for {}", VarArgs.va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
        ChannelFuture future = channel.write((Object)pubsubRequestBuilder.build());
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Error writing a Consume request to host: {} with messageSeqId: {} for {}", VarArgs.va(NetUtils.getHostFromChannel(channel), messageSeqId, topicSubscriber));
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void storeTopic2HostMapping(ByteString topic, InetSocketAddress host) {
        HashSet<ByteString> newTopicsSet;
        InetSocketAddress oldHost = this.topic2Host.putIfAbsent(topic, host);
        if (null != oldHost && oldHost.equals(host)) {
            return;
        }
        if (null != oldHost) {
            if (!this.topic2Host.replace(topic, oldHost, host)) {
                logger.warn("Ownership of topic: {} has been changed from {} to {} when storeing host: {}", VarArgs.va(topic.toStringUtf8(), oldHost, this.topic2Host.get(topic), host));
                return;
            }
            logger.debug("Storing info for topic: {}, old host: {}, new host: {}.", VarArgs.va(topic.toStringUtf8(), oldHost, host));
            this.clearHostForTopic(topic, oldHost);
        } else {
            logger.debug("Storing info for topic: {}, host: {}.", VarArgs.va(topic.toStringUtf8(), host));
        }
        HashSet<ByteString> topicsForHost = (HashSet<ByteString>)this.host2Topics.get(host);
        if (null == topicsForHost && null == (topicsForHost = (Set)this.host2Topics.putIfAbsent(host, newTopicsSet = new HashSet<ByteString>()))) {
            topicsForHost = newTopicsSet;
        }
        HashSet<ByteString> hashSet = topicsForHost;
        synchronized (hashSet) {
            if (!host.equals(this.topic2Host.get(topic))) return;
            topicsForHost.add(topic);
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearAllTopicsForHost(InetSocketAddress host) {
        logger.debug("Clearing all topics for host: {}", (Object)host);
        Set topicsForHost = (Set)this.host2Topics.get(host);
        if (null != topicsForHost) {
            Set set = topicsForHost;
            synchronized (set) {
                for (ByteString topic : topicsForHost) {
                    logger.debug("Removing mapping for topic: {} from host: {}.", VarArgs.va(topic.toStringUtf8(), host));
                    this.topic2Host.remove(topic, host);
                }
            }
            this.host2Topics.remove(host, topicsForHost);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearHostForTopic(ByteString topic, InetSocketAddress host) {
        Set topicsForHost;
        logger.debug("Clearing topic: {} from host: {}.", VarArgs.va(topic.toStringUtf8(), host));
        if (this.topic2Host.remove(topic, host)) {
            logger.debug("Removed topic to host mapping for topic: {} and host: {}.", VarArgs.va(topic.toStringUtf8(), host));
        }
        if (null != (topicsForHost = (Set)this.host2Topics.get(host))) {
            boolean removed;
            Set set = topicsForHost;
            synchronized (set) {
                removed = topicsForHost.remove(topic);
            }
            if (removed) {
                logger.debug("Removed topic: {} from host: {}.", (Object)topic.toStringUtf8(), (Object)host);
                if (topicsForHost.isEmpty()) {
                    this.host2Topics.remove(host, EMPTY_TOPIC_SET);
                }
            }
        }
    }

    @Override
    public long nextTxnId() {
        return this.globalCounter.incrementAndGet();
    }

    protected abstract void restartDelivery(TopicSubscriber var1) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException;

    protected abstract void checkTimeoutRequestsOnSubscriptionChannels();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isClosed() {
        this.closedLock.readLock().lock();
        try {
            boolean bl = this.closed;
            return bl;
        }
        finally {
            this.closedLock.readLock().unlock();
        }
    }

    protected abstract void closeSubscriptionChannels();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        logger.info("Shutting down the channels manager.");
        this.closedLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        finally {
            this.closedLock.writeLock().unlock();
        }
        this.clientTimer.cancel();
        this.host2NonSubscriptionChannels.close();
        this.closeSubscriptionChannels();
        this.topic2Host.clear();
        this.host2Topics.clear();
    }

    class PubSubRequestTimeoutTask
    extends TimerTask {
        PubSubRequestTimeoutTask() {
        }

        @Override
        public void run() {
            if (AbstractHChannelManager.this.isClosed()) {
                return;
            }
            logger.debug("Running the PubSubRequest Timeout Task");
            for (HChannel channel : AbstractHChannelManager.this.host2NonSubscriptionChannels.getChannels()) {
                try {
                    HChannelHandler channelHandler = HChannelImpl.getHChannelHandlerFromChannel(channel.getChannel());
                    channelHandler.checkTimeoutRequests();
                }
                catch (NoResponseHandlerException nrhe) {}
            }
            AbstractHChannelManager.this.checkTimeoutRequestsOnSubscriptionChannels();
        }
    }
}

