package org.apache.hw_v4_0_0.hedwig.client.netty;

import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hw_v4_0_0.hedwig.client.api.MessageHandler;
import org.apache.hw_v4_0_0.hedwig.client.api.Subscriber;
import org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration;
import org.apache.hw_v4_0_0.hedwig.client.data.PubSubData;
import org.apache.hw_v4_0_0.hedwig.client.data.TopicSubscriber;
import org.apache.hw_v4_0_0.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hw_v4_0_0.hedwig.client.handlers.PubSubCallback;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.jboss.hw_v4_0_0.netty.channel.Channel;
import org.jboss.hw_v4_0_0.netty.channel.ChannelFuture;
import org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/client/netty/HedwigSubscriber.class */
public class HedwigSubscriber implements Subscriber {
    private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);
    protected final ConcurrentMap<TopicSubscriber, Channel> topicSubscriber2Channel = new ConcurrentHashMap();
    protected final HedwigClientImpl client;
    protected final ClientConfiguration cfg;

    public HedwigSubscriber(HedwigClientImpl hedwigClientImpl) {
        this.client = hedwigClientImpl;
        this.cfg = hedwigClientImpl.getConfiguration();
    }

    private void subUnsub(ByteString byteString, ByteString byteString2, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling a sync subUnsub request for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: " + createOrAttach);
        }
        PubSubData pubSubData = new PubSubData(byteString, null, byteString2, operationType, createOrAttach, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            asyncSubUnsub(byteString, byteString2, pubSubCallback, null, operationType, createOrAttach);
            while (!pubSubData.isDone) {
                try {
                    pubSubData.wait();
                } catch (InterruptedException e) {
                    throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
                }
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to SubUnsub request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw ((PubSubException.CouldNotConnectException) failureException);
                }
                if (failureException instanceof PubSubException.ClientAlreadySubscribedException) {
                    throw ((PubSubException.ClientAlreadySubscribedException) failureException);
                }
                if (failureException instanceof PubSubException.ClientNotSubscribedException) {
                    throw ((PubSubException.ClientNotSubscribedException) failureException);
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw ((PubSubException.ServiceDownException) failureException);
                }
                logger.error("Unexpected PubSubException thrown: " + failureException.toString());
                throw new PubSubException.ServiceDownException(failureException);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void asyncSubUnsub(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj, PubSubProtocol.OperationType operationType, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach) {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling an async subUnsub request for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", operationType: " + operationType + ", createOrAttach: " + createOrAttach);
        }
        PubSubData pubSubData = new PubSubData(byteString, null, byteString2, operationType, createOrAttach, callback, obj);
        if (!this.client.topic2Host.containsKey(byteString)) {
            this.client.doConnect(pubSubData, this.cfg.getDefaultServerHost());
            return;
        }
        InetSocketAddress inetSocketAddress = this.client.topic2Host.get(byteString);
        if (operationType.equals(PubSubProtocol.OperationType.UNSUBSCRIBE) && this.client.getPublisher().host2Channel.containsKey(inetSocketAddress)) {
            doSubUnsub(pubSubData, this.client.getPublisher().host2Channel.get(inetSocketAddress));
        } else {
            this.client.doConnect(pubSubData, inetSocketAddress);
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void subscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        subscribe(byteString, byteString2, createOrAttach, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach, boolean z) throws PubSubException.CouldNotConnectException, PubSubException.ClientAlreadySubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!isValidSubscriberId(byteString2, z)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z);
        }
        try {
            subUnsub(byteString, byteString2, PubSubProtocol.OperationType.SUBSCRIBE, createOrAttach);
        } catch (PubSubException.ClientNotSubscribedException e) {
            logger.error("Unexpected Exception thrown: " + e.toString());
            throw new PubSubException.ServiceDownException(e);
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void asyncSubscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach, Callback<Void> callback, Object obj) {
        asyncSubscribe(byteString, byteString2, createOrAttach, callback, obj, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void asyncSubscribe(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach, Callback<Void> callback, Object obj, boolean z) {
        if (isValidSubscriberId(byteString2, z)) {
            asyncSubUnsub(byteString, byteString2, callback, obj, PubSubProtocol.OperationType.SUBSCRIBE, createOrAttach);
        } else {
            callback.operationFailed(obj, new PubSubException.ServiceDownException(new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z)));
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void unsubscribe(ByteString byteString, ByteString byteString2) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        unsubscribe(byteString, byteString2, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(ByteString byteString, ByteString byteString2, boolean z) throws PubSubException.CouldNotConnectException, PubSubException.ClientNotSubscribedException, PubSubException.ServiceDownException, InvalidSubscriberIdException {
        if (!isValidSubscriberId(byteString2, z)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z);
        }
        closeSubscription(byteString, byteString2);
        try {
            subUnsub(byteString, byteString2, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
        } catch (PubSubException.ClientAlreadySubscribedException e) {
            logger.error("Unexpected Exception thrown: " + e.toString());
            throw new PubSubException.ServiceDownException(e);
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void asyncUnsubscribe(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        asyncUnsubscribe(byteString, byteString2, callback, obj, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void asyncUnsubscribe(final ByteString byteString, final ByteString byteString2, final Callback<Void> callback, final Object obj, boolean z) {
        if (isValidSubscriberId(byteString2, z)) {
            asyncCloseSubscription(byteString, byteString2, new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber.1
                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFinished(Object obj2, Void r10) {
                    HedwigSubscriber.this.asyncSubUnsub(byteString, byteString2, callback, obj, PubSubProtocol.OperationType.UNSUBSCRIBE, null);
                }

                @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                public void operationFailed(Object obj2, PubSubException pubSubException) {
                    callback.operationFailed(obj, pubSubException);
                }
            }, null);
        } else {
            callback.operationFailed(obj, new PubSubException.ServiceDownException(new InvalidSubscriberIdException("SubscriberId passed is not valid: " + byteString2.toStringUtf8() + ", isHub: " + z)));
        }
    }

    private boolean isValidSubscriberId(ByteString byteString, boolean z) {
        if (!z || SubscriptionStateUtils.isHubSubscriber(byteString)) {
            return z || !SubscriptionStateUtils.isHubSubscriber(byteString);
        }
        return false;
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void consume(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId) throws PubSubException.ClientNotSubscribedException {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling consume for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", messageSeqId: " + messageSeqId);
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber)) {
            throw new PubSubException.ClientNotSubscribedException("Cannot send consume message since client is not subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        doConsume(new PubSubData(byteString, null, byteString2, PubSubProtocol.OperationType.CONSUME, null, null, null), this.topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doSubUnsub(PubSubData pubSubData, Channel channel) {
        PubSubProtocol.PubSubRequest.Builder newBuilder = PubSubProtocol.PubSubRequest.newBuilder();
        newBuilder.setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE);
        newBuilder.setType(pubSubData.operationType);
        if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) {
            newBuilder.addAllTriedServers(pubSubData.triedServers);
        }
        long incrementAndGet = this.client.globalCounter.incrementAndGet();
        newBuilder.setTxnId(incrementAndGet);
        newBuilder.setShouldClaim(pubSubData.shouldClaim);
        newBuilder.setTopic(pubSubData.topic);
        if (pubSubData.operationType.equals(PubSubProtocol.OperationType.SUBSCRIBE)) {
            PubSubProtocol.SubscribeRequest.Builder newBuilder2 = PubSubProtocol.SubscribeRequest.newBuilder();
            newBuilder2.setSubscriberId(pubSubData.subscriberId);
            newBuilder2.setCreateOrAttach(pubSubData.createOrAttach);
            newBuilder2.setSynchronous(true);
            newBuilder.setSubscribeRequest(newBuilder2);
        } else {
            PubSubProtocol.UnsubscribeRequest.Builder newBuilder3 = PubSubProtocol.UnsubscribeRequest.newBuilder();
            newBuilder3.setSubscriberId(pubSubData.subscriberId);
            newBuilder.setUnsubscribeRequest(newBuilder3);
        }
        pubSubData.txnId = incrementAndGet;
        pubSubData.requestWriteTime = System.currentTimeMillis();
        HedwigClientImpl.getResponseHandlerFromChannel(channel).txn2PubSubData.put(Long.valueOf(incrementAndGet), pubSubData);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a SubUnsub request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " for pubSubData: " + pubSubData);
        }
        channel.write(newBuilder.build()).addListener(new WriteCallback(pubSubData, this.client));
    }

    public void doConsume(final PubSubData pubSubData, final Channel channel, final PubSubProtocol.MessageSeqId messageSeqId) {
        PubSubProtocol.PubSubRequest.Builder newBuilder = PubSubProtocol.PubSubRequest.newBuilder();
        newBuilder.setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE);
        newBuilder.setType(PubSubProtocol.OperationType.CONSUME);
        newBuilder.setTxnId(this.client.globalCounter.incrementAndGet());
        newBuilder.setTopic(pubSubData.topic);
        PubSubProtocol.ConsumeRequest.Builder newBuilder2 = PubSubProtocol.ConsumeRequest.newBuilder();
        newBuilder2.setSubscriberId(pubSubData.subscriberId);
        newBuilder2.setMsgId(messageSeqId);
        newBuilder.setConsumeRequest(newBuilder2);
        if (logger.isDebugEnabled()) {
            logger.debug("Writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
        }
        channel.write(newBuilder.build()).addListener(new ChannelFutureListener() { // from class: org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber.2
            @Override // org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                HedwigSubscriber.logger.error("Error writing a Consume request to host: " + HedwigClientImpl.getHostFromChannel(channel) + " with messageSeqId: " + messageSeqId + " for pubSubData: " + pubSubData);
            }
        });
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public boolean hasSubscription(ByteString byteString, ByteString byteString2) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return this.topicSubscriber2Channel.containsKey(new TopicSubscriber(byteString, byteString2));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public List<ByteString> getSubscriptionList(ByteString byteString) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        return null;
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void startDelivery(final ByteString byteString, final ByteString byteString2, MessageHandler messageHandler) throws PubSubException.ClientNotSubscribedException {
        if (logger.isDebugEnabled()) {
            logger.debug("Starting delivery for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber)) {
            logger.error("Client is not yet subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        Channel channel = this.topicSubscriber2Channel.get(topicSubscriber);
        HedwigClientImpl.getResponseHandlerFromChannel(channel).getSubscribeResponseHandler().setMessageHandler(messageHandler);
        channel.setReadable(true).addListener(new ChannelFutureListener() { // from class: org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber.3
            @Override // org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                HedwigSubscriber.logger.error("Unable to make subscriber Channel readable in startDelivery call for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            }
        });
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void stopDelivery(final ByteString byteString, final ByteString byteString2) throws PubSubException.ClientNotSubscribedException {
        if (logger.isDebugEnabled()) {
            logger.debug("Stopping delivery for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber)) {
            logger.error("Client is not yet subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            throw new PubSubException.ClientNotSubscribedException("Client is not yet subscribed to topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        Channel channel = this.topicSubscriber2Channel.get(topicSubscriber);
        HedwigClientImpl.getResponseHandlerFromChannel(channel).getSubscribeResponseHandler().setMessageHandler(null);
        channel.setReadable(false).addListener(new ChannelFutureListener() { // from class: org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber.4
            @Override // org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                HedwigSubscriber.logger.error("Unable to make subscriber Channel not readable in stopDelivery call for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            }
        });
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void closeSubscription(ByteString byteString, ByteString byteString2) throws PubSubException.ServiceDownException {
        PubSubData pubSubData = new PubSubData(byteString, null, byteString2, null, null, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            asyncCloseSubscription(byteString, byteString2, pubSubCallback, null);
            while (!pubSubData.isDone) {
                try {
                    pubSubData.wait();
                } catch (InterruptedException e) {
                    throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
                }
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                throw new PubSubException.ServiceDownException("Exception while trying to close the subscription for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            }
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.client.api.Subscriber
    public void asyncCloseSubscription(final ByteString byteString, final ByteString byteString2, final Callback<Void> callback, final Object obj) {
        if (logger.isDebugEnabled()) {
            logger.debug("Closing subscription asynchronously for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(byteString, byteString2);
        if (!this.topicSubscriber2Channel.containsKey(topicSubscriber)) {
            logger.warn("Trying to close a subscription when we don't have a subscribe channel cached for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
            callback.operationFinished(obj, null);
        } else {
            Channel channel = this.topicSubscriber2Channel.get(topicSubscriber);
            this.topicSubscriber2Channel.remove(topicSubscriber);
            HedwigClientImpl.getResponseHandlerFromChannel(channel).channelClosedExplicitly = true;
            channel.close().addListener(new ChannelFutureListener() { // from class: org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber.5
                @Override // org.jboss.hw_v4_0_0.netty.channel.ChannelFutureListener
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        callback.operationFinished(obj, null);
                    } else {
                        HedwigSubscriber.logger.error("Failed to close the subscription channel for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8());
                        callback.operationFailed(obj, new PubSubException.ServiceDownException("Failed to close the subscription channel for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8()));
                    }
                }
            });
        }
    }

    public Channel getChannelForTopic(TopicSubscriber topicSubscriber) {
        return this.topicSubscriber2Channel.get(topicSubscriber);
    }

    public void setChannelForTopic(TopicSubscriber topicSubscriber, Channel channel) {
        this.topicSubscriber2Channel.put(topicSubscriber, channel);
    }

    public void removeChannelForTopic(TopicSubscriber topicSubscriber) {
        this.topicSubscriber2Channel.remove(topicSubscriber);
    }
}
