/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.util.List;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.client.netty.FilterableMessageHandler;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.VoidCallbackAdapter;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.SubscriptionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HedwigSubscriber
implements Subscriber {
    private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
    protected final ClientConfiguration cfg;
    protected final HChannelManager channelManager;

    public HedwigSubscriber(HedwigClientImpl client) {
        this.cfg = client.getConfiguration();
        this.channelManager = client.getHChannelManager();
    }

    @Override
    public void addSubscriptionListener(SubscriptionListener listener) {
        this.channelManager.getSubscriptionEventEmitter().addSubscriptionListener(listener);
    }

    @Override
    public void removeSubscriptionListener(SubscriptionListener listener) {
        this.channelManager.getSubscriptionEventEmitter().removeSubscriptionListener(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subUnsub(ByteString topic, ByteString subscriberId, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions options) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException {
        PubSubData pubSubData;
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ").append(topic.toStringUtf8()).append(", subscriberId: ").append(subscriberId.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()).append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to SubUnsub request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw (PubSubException.CouldNotConnectException)failureException;
                }
                if (failureException instanceof PubSubException.ClientAlreadySubscribedException) {
                    throw (PubSubException.ClientAlreadySubscribedException)failureException;
                }
                if (failureException instanceof PubSubException.ClientNotSubscribedException) {
                    throw (PubSubException.ClientNotSubscribedException)failureException;
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw (PubSubException.ServiceDownException)failureException;
                }
                logger.error("Unexpected PubSubException thrown: ", (Throwable)failureException);
                throw new PubSubException.ServiceDownException((Exception)failureException);
            }
        }
    }

    private void asyncSubUnsub(ByteString topic, ByteString subscriberId, Callback<PubSubProtocol.ResponseBody> callback, Object context, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscriptionOptions options) {
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ").append(topic.toStringUtf8()).append(", subscriberId: ").append(subscriberId.toStringUtf8()).append(", operationType: ").append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()).append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        if (PubSubProtocol.OperationType.SUBSCRIBE.equals((Object)operationType) && options.getMessageBound() <= 0 && this.cfg.getSubscriptionMessageBound() > 0) {
            PubSubProtocol.SubscriptionOptions.Builder soBuilder = PubSubProtocol.SubscriptionOptions.newBuilder((PubSubProtocol.SubscriptionOptions)options).setMessageBound(this.cfg.getSubscriptionMessageBound());
            options = soBuilder.build();
        }
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback, context);
        this.channelManager.submitOp(pubSubData);
    }

    @Override
    public void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscribeRequest.CreateOrAttach mode) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        PubSubProtocol.SubscriptionOptions options = PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        this.subscribe(topic, subscriberId, options, false);
    }

    @Override
    public void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        this.subscribe(topic, subscriberId, options, false);
    }

    protected void subscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, boolean isHub) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub);
        }
        try {
            this.subUnsub(topic, subscriberId, PubSubProtocol.OperationType.SUBSCRIBE, options);
        }
        catch (PubSubException.ClientNotSubscribedException e) {
            logger.error("Unexpected Exception thrown: ", (Throwable)e);
            throw new PubSubException.ServiceDownException((Exception)((Object)e));
        }
    }

    @Override
    public void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscribeRequest.CreateOrAttach mode, Callback<Void> callback, Object context) {
        PubSubProtocol.SubscriptionOptions options = PubSubProtocol.SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        this.asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    @Override
    public void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, Callback<Void> callback, Object context) {
        this.asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    protected void asyncSubscribe(ByteString topic, ByteString subscriberId, PubSubProtocol.SubscriptionOptions options, Callback<Void> callback, Object context, boolean isHub) {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException((Exception)new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        this.asyncSubUnsub(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, PubSubProtocol.OperationType.SUBSCRIBE, options);
    }

    @Override
    public void unsubscribe(ByteString topic, ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        this.unsubscribe(topic, subscriberId, false);
    }

    protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub);
        }
        this.closeSubscription(topic, subscriberId);
        try {
            this.subUnsub(topic, subscriberId, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
        }
        catch (PubSubException.ClientAlreadySubscribedException e) {
            logger.error("Unexpected Exception thrown: ", (Throwable)e);
            throw new PubSubException.ServiceDownException((Exception)((Object)e));
        }
    }

    @Override
    public void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context) {
        this.doAsyncUnsubscribe(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, false);
    }

    protected void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context, boolean isHub) {
        this.doAsyncUnsubscribe(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context, isHub);
    }

    private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<PubSubProtocol.ResponseBody> callback, final Object context, boolean isHub) {
        if (!this.isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, (PubSubException)new PubSubException.ServiceDownException((Exception)new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        this.doAsyncCloseSubscription(topic, subscriberId, new Callback<PubSubProtocol.ResponseBody>(){

            @Override
            public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
                HedwigSubscriber.this.asyncSubUnsub(topic, subscriberId, callback, context, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
            }

            @Override
            public void operationFailed(Object ctx, PubSubException exception) {
                callback.operationFailed(context, exception);
            }
        }, null);
    }

    private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
        return (!isHub || SubscriptionStateUtils.isHubSubscriber((ByteString)subscriberId)) && (isHub || !SubscriptionStateUtils.isHubSubscriber((ByteString)subscriberId));
    }

    @Override
    public void consume(ByteString topic, ByteString subscriberId, PubSubProtocol.MessageSeqId messageSeqId) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Calling consume for {}, messageSeqId: {}.", (Object)topicSubscriber, (Object)messageSeqId);
        SubscribeResponseHandler subscribeResponseHandler = this.channelManager.getSubscribeResponseHandler(topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            throw new PubSubException.ClientNotSubscribedException("Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
    }

    @Override
    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        SubscribeResponseHandler subscribeResponseHandler = this.channelManager.getSubscribeResponseHandler(topicSubscriber);
        return null != subscribeResponseHandler && subscribeResponseHandler.hasSubscription(topicSubscriber);
    }

    @Override
    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return null;
    }

    @Override
    public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Starting delivery for {}.", (Object)topicSubscriber);
        this.channelManager.startDelivery(topicSubscriber, messageHandler);
    }

    @Override
    public void startDeliveryWithFilter(ByteString topic, ByteString subscriberId, MessageHandler messageHandler, ClientMessageFilter messageFilter) throws PubSubException.ClientNotSubscribedException, AlreadyStartDeliveryException {
        if (null == messageHandler || null == messageFilter) {
            throw new NullPointerException("Null message handler or message filter is       provided.");
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
        logger.debug("Starting delivery with filter for {}.", (Object)topicSubscriber);
        this.channelManager.startDelivery(topicSubscriber, messageHandler);
    }

    @Override
    public void stopDelivery(ByteString topic, ByteString subscriberId) throws PubSubException.ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Stopping delivery for {}.", (Object)topicSubscriber);
        this.channelManager.stopDelivery(topicSubscriber);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeSubscription(ByteString topic, ByteString subscriberId) throws PubSubException.ServiceDownException {
        PubSubData pubSubData;
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.doAsyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                throw new PubSubException.ServiceDownException("Exception while trying to close the subscription for topic: " + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            }
        }
    }

    @Override
    public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context) {
        this.doAsyncCloseSubscription(topic, subscriberId, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context);
    }

    private void doAsyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<PubSubProtocol.ResponseBody> callback, Object context) {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Stopping delivery for {} before closing subscription.", (Object)topicSubscriber);
        try {
            this.channelManager.stopDelivery(topicSubscriber);
        }
        catch (PubSubException.ClientNotSubscribedException cnse) {
            // empty catch block
        }
        logger.debug("Closing subscription asynchronously for {}.", (Object)topicSubscriber);
        this.channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
    }
}

