package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.selectors.SelectorSupport;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSConsumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarMessageConsumer.class */
public class PulsarMessageConsumer implements MessageConsumer, TopicSubscriber, QueueReceiver {
    private static final Logger log = LoggerFactory.getLogger(PulsarMessageConsumer.class);
    final String subscriptionName;
    private final PulsarSession session;
    private final PulsarDestination destination;
    private SelectorSupport selectorSupport;
    private final boolean useServerSideFiltering;
    private final boolean noLocal;
    private ConsumerBase<?> consumer;
    private MessageListener listener;
    private final SubscriptionMode subscriptionMode;
    private final SubscriptionType subscriptionType;
    private final boolean dedicatedListenerThread;
    final boolean unregisterSubscriptionOnClose;
    private final Map<String, SelectorSupport> selectorSupportOnSubscriptions = new HashMap();
    private AtomicBoolean closed = new AtomicBoolean();
    private AtomicBoolean requestClose = new AtomicBoolean();
    private final AtomicBoolean closedWhileActiveTransaction = new AtomicBoolean(false);
    final AtomicLong receivedMessages = new AtomicLong();
    final AtomicLong skippedMessages = new AtomicLong();

    public PulsarMessageConsumer(String str, PulsarDestination pulsarDestination, PulsarSession pulsarSession, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String str2, boolean z, boolean z2) throws JMSException {
        this.noLocal = z2;
        pulsarSession.checkNotClosed();
        if (pulsarDestination == null) {
            throw new InvalidDestinationException("Invalid destination");
        }
        if ((pulsarDestination instanceof PulsarTemporaryDestination) && ((PulsarTemporaryDestination) pulsarDestination).getSession() != pulsarSession) {
            throw new JMSException("Cannot subscribe to a temporary destination not created but this session");
        }
        this.subscriptionName = str;
        this.session = pulsarSession;
        this.dedicatedListenerThread = pulsarSession.isDedicatedListenerThread();
        this.useServerSideFiltering = pulsarSession.getFactory().isUseServerSideFiltering();
        this.destination = pulsarDestination;
        this.subscriptionMode = pulsarDestination.isQueue() ? SubscriptionMode.Durable : subscriptionMode;
        this.subscriptionType = pulsarDestination.isQueue() ? SubscriptionType.Shared : subscriptionType;
        this.selectorSupport = SelectorSupport.build(str2, subscriptionType == SubscriptionType.Exclusive || pulsarSession.getFactory().isEnableClientSideEmulation() || pulsarSession.getFactory().isUseServerSideFiltering());
        this.unregisterSubscriptionOnClose = z;
        if (z2 && subscriptionType != SubscriptionType.Exclusive && !pulsarSession.getFactory().isEnableClientSideEmulation()) {
            throw new IllegalStateException("noLocal is not enabled by default with subscriptionType " + subscriptionType + ", please set jms.enableClientSideEmulation=true");
        }
    }

    public PulsarMessageConsumer subscribe() throws JMSException {
        if (this.destination.isQueue()) {
            this.session.getFactory().ensureQueueSubscription(this.destination);
        } else {
            getConsumer();
        }
        this.session.registerConsumer(this);
        return this;
    }

    synchronized ConsumerBase<?> getConsumer() throws JMSException {
        if (this.closed.get()) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (this.consumer == null) {
            this.consumer = this.session.getFactory().createConsumer(this.destination, this.subscriptionName, this.subscriptionMode, this.subscriptionType, internalGetMessageSelector(), this.noLocal, this.session);
        }
        return this.consumer;
    }

    boolean isClosed() {
        return this.closed.get();
    }

    public String getMessageSelector() throws JMSException {
        checkNotClosed();
        String internalGetMessageSelector = internalGetMessageSelector();
        String internalGetMessageSelectorFromSubscription = internalGetMessageSelectorFromSubscription();
        return internalGetMessageSelectorFromSubscription == null ? internalGetMessageSelector : internalGetMessageSelector == null ? internalGetMessageSelectorFromSubscription : "(" + internalGetMessageSelectorFromSubscription + ") AND (" + internalGetMessageSelector + ")";
    }

    private synchronized String internalGetMessageSelector() {
        if (this.selectorSupport != null) {
            return this.selectorSupport.getSelector();
        }
        return null;
    }

    private synchronized String internalGetMessageSelectorFromSubscription() {
        SelectorSupport next;
        if (!this.useServerSideFiltering || this.selectorSupportOnSubscriptions.isEmpty() || (next = this.selectorSupportOnSubscriptions.values().iterator().next()) == null) {
            return null;
        }
        return next.getSelector();
    }

    public synchronized MessageListener getMessageListener() throws JMSException {
        checkNotClosed();
        return this.listener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkNotClosed() throws JMSException {
        this.session.checkNotClosed();
        if (this.closed.get() || this.requestClose.get()) {
            throw new IllegalStateException("This consumer is closed");
        }
    }

    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException {
        checkNotClosed();
        this.listener = messageListener;
        this.session.ensureListenerThread(this);
    }

    public Message receive() throws JMSException {
        return receiveWithTimeoutAndValidateType(Long.MAX_VALUE, null);
    }

    public Message receive(long j) throws JMSException {
        return receiveWithTimeoutAndValidateType(j, null);
    }

    public List<Message> batchReceive(int i, long j) throws JMSException {
        Message receiveNoWait;
        if (this.closed.get()) {
            return Collections.emptyList();
        }
        getConsumer();
        if (i < 0) {
            i = 1;
        }
        Message receive = receive(j);
        if (receive == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(i);
        arrayList.add(receive);
        while (hasSomePrefetchedMessages() && arrayList.size() < i && (receiveNoWait = receiveNoWait()) != null) {
            arrayList.add(receiveNoWait);
        }
        return arrayList;
    }

    public boolean hasSomePrefetchedMessages() {
        return this.consumer.getTotalIncomingMessages() > 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Message receiveWithTimeoutAndValidateType(long j, Class cls) throws JMSException {
        checkNotClosed();
        if (this.listener != null) {
            throw new IllegalStateException("cannot receive if you have a messageListener");
        }
        int i = j == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) j;
        int i2 = j < 100 ? (int) j : 100;
        long currentTimeMillis = System.currentTimeMillis();
        return (Message) this.session.executeOperationIfConnectionStarted(() -> {
            do {
                Message message = (Message) this.session.executeCriticalOperation(() -> {
                    try {
                        ConsumerBase<?> consumer = getConsumer();
                        org.apache.pulsar.client.api.Message<?> receive = consumer.receive(i2, TimeUnit.MILLISECONDS);
                        if (receive == null) {
                            return null;
                        }
                        return handleReceivedMessage(receive, consumer, cls, null, this.noLocal);
                    } catch (Exception e) {
                        throw Utils.handleException(e);
                    }
                });
                if (message != null) {
                    return message;
                }
                if (System.currentTimeMillis() - currentTimeMillis >= j) {
                    return null;
                }
            } while (!this.session.isClosed());
            return null;
        }, i);
    }

    public Message receiveNoWait() throws JMSException {
        return receive(1L);
    }

    private void skipMessage(org.apache.pulsar.client.api.Message<?> message) throws JMSException {
        this.skippedMessages.incrementAndGet();
        if (this.subscriptionType != SubscriptionType.Exclusive && !this.session.getFactory().isAcknowledgeRejectedMessages()) {
            if (log.isDebugEnabled()) {
                log.debug("nAck filtered msg {}", message.getMessageId());
            }
            this.consumer.negativeAcknowledge(message);
        } else if (this.session.getTransaction() != null) {
            this.consumer.acknowledgeAsync(message.getMessageId(), this.session.getTransaction());
        } else {
            this.consumer.acknowledgeAsync(message.getMessageId());
        }
    }

    private PulsarMessage handleReceivedMessage(org.apache.pulsar.client.api.Message<?> message, Consumer<?> consumer, Class cls, java.util.function.Consumer<PulsarMessage> consumer2, boolean z) throws JMSException, PulsarClientException {
        String stringProperty;
        this.receivedMessages.incrementAndGet();
        PulsarMessage decode = PulsarMessage.decode(this, consumer, message);
        if (cls != null && !decode.isBodyAssignableTo(cls)) {
            if (log.isDebugEnabled()) {
                log.debug("negativeAcknowledge for message {} that cannot be converted to {}", message, cls);
            }
            consumer.negativeAcknowledge(message);
            throw new MessageFormatException("The message (" + decode.messageType() + "," + decode + ",) cannot be converted to a " + cls);
        }
        SelectorSupport selectorSupportOnSubscription = getSelectorSupportOnSubscription(message.getTopicName());
        if (selectorSupportOnSubscription != null && requiresClientSideFiltering(message) && !selectorSupportOnSubscription.matches(decode)) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} does not match subscription selector {}", decode, selectorSupportOnSubscription.getSelector());
            }
            this.skippedMessages.incrementAndGet();
            consumer.acknowledgeAsync(message.getMessageId());
            return null;
        }
        SelectorSupport selectorSupport = getSelectorSupport();
        if (selectorSupport != null && requiresClientSideFiltering(message) && !selectorSupport.matches(decode)) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} does not match selector {}", decode, selectorSupport.getSelector());
            }
            skipMessage(message);
            return null;
        }
        if (z && (stringProperty = decode.getStringProperty("JMSConnectionID")) != null && stringProperty.equals(this.session.getConnection().getConnectionId())) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} was generated from this connection {}", decode, stringProperty);
            }
            skipMessage(message);
            return null;
        }
        if (decode.getJMSExpiration() > 0 && System.currentTimeMillis() >= decode.getJMSExpiration()) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} expired at {}", decode, Instant.ofEpochMilli(decode.getJMSExpiration()));
            }
            skipMessage(message);
            return null;
        }
        this.session.registerUnacknowledgedMessage(decode);
        if (consumer2 != null) {
            try {
                consumer2.accept(decode);
                if (decode.isNegativeAcked()) {
                    return null;
                }
            } catch (Throwable th) {
                log.error("Listener thrown error, calling negativeAcknowledge", th);
                consumer.negativeAcknowledge(message);
                throw Utils.handleException(th);
            }
        }
        if (this.session.getTransacted()) {
            this.session.getTransaction();
        } else if (this.session.getAcknowledgeMode() == 1) {
            consumer.acknowledge(message);
        } else if (this.session.getAcknowledgeMode() == 3) {
            consumer.acknowledgeAsync(message).whenComplete((r6, th2) -> {
                if (th2 != null) {
                    log.error("Cannot acknowledge message {} {}", message, th2);
                }
            });
        }
        if (this.session.getAcknowledgeMode() != 2 && this.session.getAcknowledgeMode() != 4 && this.session.getAcknowledgeMode() != 0) {
            this.session.unregisterUnacknowledgedMessage(decode);
        }
        if (this.requestClose.get()) {
            closeInternal();
        }
        return decode;
    }

    private boolean requiresClientSideFiltering(org.apache.pulsar.client.api.Message<?> message) {
        return (message.getMessageId() instanceof BatchMessageIdImpl) || !this.session.getFactory().isUseServerSideFiltering();
    }

    public void close() throws JMSException {
        Consumer<?> internalConsumer;
        if (Utils.isOnMessageListener(this.session, this)) {
            this.requestClose.set(true);
            return;
        }
        if (this.closed.compareAndSet(false, true) && (internalConsumer = getInternalConsumer()) != null) {
            if (!this.session.isTransactionStarted()) {
                this.session.executeCriticalOperation(() -> {
                    try {
                        internalConsumer.close();
                        this.session.removeConsumer(this);
                        return null;
                    } catch (Exception e) {
                        throw Utils.handleException(e);
                    }
                });
            } else if (this.session.getTransacted()) {
                this.closedWhileActiveTransaction.set(true);
            }
        }
    }

    public String toString() {
        return "PulsarConsumer{subscriptionName=" + this.subscriptionName + ", destination=" + this.destination + '}';
    }

    public synchronized Topic getTopic() throws JMSException {
        checkNotClosed();
        if (this.destination.isTopic()) {
            return this.destination;
        }
        throw new JMSException("This consumer has been created on a Queue");
    }

    public synchronized Queue getQueue() throws JMSException {
        checkNotClosed();
        if (this.destination.isQueue()) {
            return this.destination;
        }
        throw new JMSException("This consumer has been created on a Topic");
    }

    public synchronized boolean getNoLocal() throws JMSException {
        checkNotClosed();
        return this.noLocal;
    }

    public JMSConsumer asJMSConsumer() {
        return new PulsarJMSConsumer(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(org.apache.pulsar.client.api.Message<?> message, PulsarMessage pulsarMessage, Consumer<?> consumer) throws JMSException {
        try {
            consumer.acknowledge(message);
            this.session.unregisterUnacknowledgedMessage(pulsarMessage);
        } catch (PulsarClientException e) {
            throw Utils.handleException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runListenerNoWait() {
        runListener(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runListener(int i) {
        if (this.closed.get()) {
            return;
        }
        boolean z = true;
        boolean z2 = false;
        synchronized (this) {
            MessageListener messageListener = this.listener;
            if (messageListener == null) {
                return;
            }
            while (z) {
                boolean executeMessageListenerInSessionContext = Utils.executeMessageListenerInSessionContext(this.session, this, () -> {
                    org.apache.pulsar.client.api.Message<?> receive;
                    try {
                        try {
                            ConsumerBase<?> consumer = getConsumer();
                            if ((i == 0 && consumer.getTotalIncomingMessages() <= 0) || (receive = consumer.receive(i, TimeUnit.MILLISECONDS)) == null) {
                                return false;
                            }
                            Objects.requireNonNull(messageListener);
                            handleReceivedMessage(receive, consumer, null, (v1) -> {
                                r4.onMessage(v1);
                            }, this.noLocal);
                            return true;
                        } catch (PulsarClientException.AlreadyClosedException e) {
                            log.error("Error while receiving message on Closed consumer {}", this);
                            return false;
                        }
                    } catch (JMSException | PulsarClientException e2) {
                        log.error("Error while receiving message on consumer {}", this, e2);
                        this.session.onError(e2);
                        return false;
                    }
                });
                if (this.dedicatedListenerThread || !executeMessageListenerInSessionContext) {
                    z = false;
                } else {
                    z = true;
                    z2 = true;
                }
            }
            if (this.dedicatedListenerThread || this.closed.get()) {
                return;
            }
            this.session.scheduleConsumerListenerCycle(this, z2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeDuringRollback() throws JMSException {
        try {
            this.consumer.close();
            this.session.removeConsumer(this);
        } catch (Exception e) {
            throw Utils.handleException(e);
        }
    }

    public void closeInternal() throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.requestClose.set(false);
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                    this.consumer = null;
                }
            } catch (Exception e) {
                throw Utils.handleException(e);
            }
        }
    }

    public boolean isClosedWhileActiveTransaction() {
        return this.closedWhileActiveTransaction.get();
    }

    public void negativeAck(org.apache.pulsar.client.api.Message<?> message) {
        if (this.consumer != null) {
            this.consumer.negativeAcknowledge(message);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void redeliverUnacknowledgedMessages() {
        if (this.consumer != null) {
            this.consumer.redeliverUnacknowledgedMessages();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession getSession() {
        return this.session;
    }

    private synchronized SelectorSupport getSelectorSupport() {
        return this.selectorSupport;
    }

    public synchronized SelectorSupport getSelectorSupportOnSubscription(String str) throws JMSException {
        if (!this.useServerSideFiltering) {
            return null;
        }
        if (!this.selectorSupportOnSubscriptions.containsKey(str)) {
            this.selectorSupportOnSubscriptions.put(str, SelectorSupport.build(this.session.getFactory().downloadServerSideFilter(str, this.destination.isQueue() ? this.session.getFactory().getQueueSubscriptionName(this.destination) : this.subscriptionName, this.subscriptionMode), true));
        }
        return this.selectorSupportOnSubscriptions.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Consumer<?> getInternalConsumer() {
        return this.consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarDestination getDestination() {
        return this.destination;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public long getReceivedMessages() {
        return this.receivedMessages.get();
    }

    public long getSkippedMessages() {
        return this.skippedMessages.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void refreshServerSideSelectors() {
        int size = this.selectorSupportOnSubscriptions.size();
        if (size > 0) {
            this.selectorSupportOnSubscriptions.clear();
            log.info("Refreshing {} server-side filters on {}", Integer.valueOf(size), this.destination);
        }
    }
}
