/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.WriteCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HedwigSubscriber
implements Subscriber {
    private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
    protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
    protected final ConcurrentMap<TopicSubscriber, MessageHandler> topicSubscriber2MessageHandler = new ConcurrentHashMap<TopicSubscriber, MessageHandler>();
    protected final HedwigClientImpl client;
    protected final ClientConfiguration cfg;
    private Object closeLock = new Object();
    private boolean closed = false;

    public HedwigSubscriber(HedwigClientImpl client) {
        this.client = client;
        this.cfg = client.getConfiguration();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subUnsub(ByteString topic, ByteString subscriberId, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions options) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException {
        PubSubData pubSubData;
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ").append(topic.toStringUtf8()).append(", subscriberId: ").append(subscriberId.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()).append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to SubUnsub request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw (PubSubException.CouldNotConnectException)failureException;
                }
                if (failureException instanceof PubSubException.ClientAlreadySubscribedException) {
                    throw (PubSubException.ClientAlreadySubscribedException)failureException;
                }
                if (failureException instanceof PubSubException.ClientNotSubscribedException) {
                    throw (PubSubException.ClientNotSubscribedException)failureException;
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw (PubSubException.ServiceDownException)failureException;
                }
                logger.error("Unexpected PubSubException thrown: " + failureException.toString());
                throw new PubSubException.ServiceDownException((Exception)failureException);
            }
        }
    }

    private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions options) {
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ").append(topic.toStringUtf8()).append(", subscriberId: ").append(subscriberId.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()).append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback, context);
        if (this.client.topic2Host.containsKey(topic)) {
            InetSocketAddress host = (InetSocketAddress)this.client.topic2Host.get(topic);
            if (operationType.equals((Object)PubSubProtocol.OperationType.UNSUBSCRIBE) && this.client.getPublisher().host2Channel.containsKey(host)) {
                this.doSubUnsub(pubSubData, (Channel)this.client.getPublisher().host2Channel.get(host));
            } else {
                this.client.doConnect(pubSubData, host);
            }
        } else {
            this.client.doConnect(pubSubData, this.cfg.getDefaultServerHost());
        }
    }

    @Override
    public void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscribeRequest.CreateOrAttach mode) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        PubSubProtocol.SubscriptionOptions options = PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        this.subscribe(topic, subscriberId, options, false);
    }

    @Override
    public void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        this.subscribe(topic, subscriberId, options, false);
    }

    protected void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, boolean isHub) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub);
        }
        try {
            this.subUnsub(topic, subscriberId, PubSubProtocol.OperationType.SUBSCRIBE, options);
        }
        catch (PubSubException.ClientNotSubscribedException e) {
            logger.error("Unexpected Exception thrown: " + e.toString());
            throw new PubSubException.ServiceDownException((Exception)((Object)e));
        }
    }

    @Override
    public void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscribeRequest.CreateOrAttach mode, Callback<Void> callback, Object context) {
        PubSubProtocol.SubscriptionOptions options = PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        this.asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    @Override
    public void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, Callback<Void> callback, Object context) {
        this.asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    protected void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, Callback<Void> callback, Object context, boolean isHub) {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException((Exception)new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        this.asyncSubUnsub(topic, subscriberId, callback, context, PubSubProtocol.OperationType.SUBSCRIBE, options);
    }

    @Override
    public void unsubscribe(ByteString topic, ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        this.unsubscribe(topic, subscriberId, false);
    }

    protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub);
        }
        this.closeSubscription(topic, subscriberId);
        try {
            this.subUnsub(topic, subscriberId, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
        }
        catch (PubSubException.ClientAlreadySubscribedException e) {
            logger.error("Unexpected Exception thrown: " + e.toString());
            throw new PubSubException.ServiceDownException((Exception)((Object)e));
        }
    }

    @Override
    public void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context) {
        this.asyncUnsubscribe(topic, subscriberId, callback, context, false);
    }

    protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback, final Object context, boolean isHub) {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException((Exception)new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        this.asyncCloseSubscription(topic, subscriberId, new Callback<Void>(){

            @Override
            public void operationFinished(Object ctx, Void resultOfOperation) {
                HedwigSubscriber.this.asyncSubUnsub(topic, subscriberId, callback, context, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
            }

            @Override
            public void operationFailed(Object ctx, PubSubException exception) {
                callback.operationFailed(context, exception);
            }
        }, null);
    }

    private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
        return (!isHub || SubscriptionStateUtils.isHubSubscriber((ByteString)subscriberId)) && (isHub || !SubscriptionStateUtils.isHubSubscriber((ByteString)subscriberId));
    }

    @Override
    public void consume(ByteString topic, ByteString subscriberId, PubSubProtocol.MessageSeqId messageSeqId) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber;
        if (logger.isDebugEnabled()) {
            logger.debug("Calling consume for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
        }
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber = new TopicSubscriber(topic, subscriberId))) {
            throw new PubSubException.ClientNotSubscribedException("Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, PubSubProtocol.OperationType.CONSUME, null, null, null);
        this.doConsume(pubSubData, (Channel)this.topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
    }

    protected void doSubUnsub(PubSubData pubSubData, Channel channel) {
        PubSubProtocol.PubSubRequest.Builder pubsubRequestBuilder = PubSubProtocol.PubSubRequest.newBuilder();
        pubsubRequestBuilder.setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE);
        pubsubRequestBuilder.setType(pubSubData.operationType);
        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
            pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers);
        }
        long txnId = this.client.globalCounter.incrementAndGet();
        pubsubRequestBuilder.setTxnId(txnId);
        pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim);
        pubsubRequestBuilder.setTopic(pubSubData.topic);
        if (pubSubData.operationType.equals((Object)PubSubProtocol.OperationType.SUBSCRIBE)) {
            PubSubProtocol.SubscribeRequest.Builder subscribeRequestBuilder = PubSubProtocol.SubscribeRequest.newBuilder();
            subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
            subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach());
            subscribeRequestBuilder.setSynchronous(true);
            if (pubSubData.options.getMessageBound() > 0) {
                subscribeRequestBuilder.setMessageBound(pubSubData.options.getMessageBound());
            } else if (this.cfg.getSubscriptionMessageBound() > 0) {
                subscribeRequestBuilder.setMessageBound(this.cfg.getSubscriptionMessageBound());
            }
            pubsubRequestBuilder.setSubscribeRequest(subscribeRequestBuilder);
        } else {
            PubSubProtocol.UnsubscribeRequest.Builder unsubscribeRequestBuilder = PubSubProtocol.UnsubscribeRequest.newBuilder();
            unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
            pubsubRequestBuilder.setUnsubscribeRequest(unsubscribeRequestBuilder);
        }
        pubSubData.txnId = txnId;
        pubSubData.requestWriteTime = System.currentTimeMillis();
        HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).txn2PubSubData.put(txnId, pubSubData);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a SubUnsub request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " for pubSubData: " + pubSubData);
        }
        ChannelFuture future = channel.write((Object)pubsubRequestBuilder.build());
        future.addListener((ChannelFutureListener)new WriteCallback(pubSubData, this.client));
    }

    public void doConsume(final PubSubData pubSubData, final Channel channel, final PubSubProtocol.MessageSeqId messageSeqId) {
        PubSubProtocol.PubSubRequest.Builder pubsubRequestBuilder = PubSubProtocol.PubSubRequest.newBuilder();
        pubsubRequestBuilder.setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE);
        pubsubRequestBuilder.setType(PubSubProtocol.OperationType.CONSUME);
        long txnId = this.client.globalCounter.incrementAndGet();
        pubsubRequestBuilder.setTxnId(txnId);
        pubsubRequestBuilder.setTopic(pubSubData.topic);
        PubSubProtocol.ConsumeRequest.Builder consumeRequestBuilder = PubSubProtocol.ConsumeRequest.newBuilder();
        consumeRequestBuilder.setSubscriberId(pubSubData.subscriberId);
        consumeRequestBuilder.setMsgId(messageSeqId);
        pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
        }
        ChannelFuture future = channel.write((Object)pubsubRequestBuilder.build());
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Error writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
                }
            }
        });
    }

    @Override
    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return this.topicSubscriber2Channel.containsKey(new TopicSubscriber(topic, subscriberId));
    }

    @Override
    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return null;
    }

    @Override
    public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topic, subscriberId, messageHandler, false);
    }

    public void restartDelivery(ByteString topic, ByteString subscriberId) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        this.startDelivery(topic, subscriberId, null, true);
    }

    private void startDelivery(final ByteString topic, final ByteString subscriberId, MessageHandler messageHandler, boolean restart) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        TopicSubscriber topicSubscriber;
        if (logger.isDebugEnabled()) {
            logger.debug("Starting delivery for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber = new TopicSubscriber(topic, subscriberId))) {
            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        Channel topicSubscriberChannel = (Channel)this.topicSubscriber2Channel.get(topicSubscriber);
        MessageHandler existedMsgHandler = (MessageHandler)this.topicSubscriber2MessageHandler.get(topicSubscriber);
        if (restart) {
            messageHandler = existedMsgHandler;
        } else {
            if (null != existedMsgHandler) {
                throw new AlreadyStartDeliveryException("A message handler has been started for topic subscriber " + topicSubscriber);
            }
            if (messageHandler != null && null != this.topicSubscriber2MessageHandler.putIfAbsent(topicSubscriber, messageHandler)) {
                throw new AlreadyStartDeliveryException("Someone is also starting delivery for topic subscriber " + topicSubscriber);
            }
        }
        HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler().setMessageHandler(messageHandler);
        ChannelFuture future = topicSubscriberChannel.setReadable(true);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
                }
            }
        });
    }

    @Override
    public void stopDelivery(final ByteString topic, final ByteString subscriberId) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber;
        if (logger.isDebugEnabled()) {
            logger.debug("Stopping delivery for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber = new TopicSubscriber(topic, subscriberId))) {
            logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        Channel topicSubscriberChannel = (Channel)this.topicSubscriber2Channel.get(topicSubscriber);
        HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler().setMessageHandler(null);
        this.topicSubscriber2MessageHandler.remove(topicSubscriber);
        ChannelFuture future = topicSubscriberChannel.setReadable(false);
        future.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeSubscription(ByteString topic, ByteString subscriberId) throws PubSubException.ServiceDownException {
        PubSubData pubSubData;
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.asyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                throw new PubSubException.ServiceDownException("Exception while trying to close the subscription for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            }
        }
    }

    @Override
    public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback, final Object context) {
        TopicSubscriber topicSubscriber;
        if (logger.isDebugEnabled()) {
            logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        if (this.topicSubscriber2Channel.containsKey(topicSubscriber = new TopicSubscriber(topic, subscriberId))) {
            Channel channel = (Channel)this.topicSubscriber2Channel.get(topicSubscriber);
            this.topicSubscriber2Channel.remove(topicSubscriber);
            HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).channelClosedExplicitly = true;
            ChannelFuture future = channel.close();
            future.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        logger.error("Failed to close the subscription channel for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
                        callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException("Failed to close the subscription channel for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8()));
                    } else {
                        callback.operationFinished(context, null);
                    }
                }
            });
        } else {
            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            callback.operationFinished(context, null);
        }
    }

    public Channel getChannelForTopic(TopicSubscriber topic) {
        return (Channel)this.topicSubscriber2Channel.get(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setChannelForTopic(TopicSubscriber topic, Channel channel) {
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                channel.close();
                return;
            }
            Channel oldc = this.topicSubscriber2Channel.putIfAbsent(topic, channel);
            if (oldc != null) {
                channel.close();
            }
        }
    }

    public void removeChannelForTopic(TopicSubscriber topic) {
        this.topicSubscriber2Channel.remove(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        Object object = this.closeLock;
        synchronized (object) {
            this.closed = true;
        }
        for (Channel channel : this.topicSubscriber2Channel.values()) {
            HedwigClientImpl.getResponseHandlerFromChannel((Channel)channel).channelClosedExplicitly = true;
            channel.close().awaitUninterruptibly();
        }
        this.topicSubscriber2Channel.clear();
    }
}

