/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarJMSConsumer;
import com.datastax.oss.pulsar.jms.PulsarMessage;
import com.datastax.oss.pulsar.jms.PulsarSession;
import com.datastax.oss.pulsar.jms.PulsarTemporaryDestination;
import com.datastax.oss.pulsar.jms.Utils;
import com.datastax.oss.pulsar.jms.selectors.SelectorSupport;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Consumer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Message;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.PulsarClientException;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionMode;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionType;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.BatchMessageIdImpl;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.ConsumerBase;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSConsumer;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMessageConsumer
implements MessageConsumer,
TopicSubscriber,
QueueReceiver {
    private static final Logger log = LoggerFactory.getLogger(PulsarMessageConsumer.class);
    final String subscriptionName;
    private final PulsarSession session;
    private final PulsarDestination destination;
    private SelectorSupport selectorSupport;
    private final Map<String, SelectorSupport> selectorSupportOnSubscriptions = new HashMap<String, SelectorSupport>();
    private final boolean useServerSideFiltering;
    private final boolean noLocal;
    private ConsumerBase<?> consumer;
    private MessageListener listener;
    private final SubscriptionMode subscriptionMode;
    private final SubscriptionType subscriptionType;
    private final boolean dedicatedListenerThread;
    final boolean unregisterSubscriptionOnClose;
    private AtomicBoolean closed = new AtomicBoolean();
    private AtomicBoolean requestClose = new AtomicBoolean();
    private final AtomicBoolean closedWhileActiveTransaction = new AtomicBoolean(false);
    final AtomicLong receivedMessages = new AtomicLong();
    final AtomicLong skippedMessages = new AtomicLong();

    public PulsarMessageConsumer(String subscriptionName, PulsarDestination destination, PulsarSession session, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String selector, boolean unregisterSubscriptionOnClose, boolean noLocal) throws JMSException {
        PulsarTemporaryDestination dest;
        this.noLocal = noLocal;
        session.checkNotClosed();
        if (destination == null) {
            throw new InvalidDestinationException("Invalid destination");
        }
        if (destination instanceof PulsarTemporaryDestination && (dest = (PulsarTemporaryDestination)destination).getSession() != session) {
            throw new JMSException("Cannot subscribe to a temporary destination not created but this session");
        }
        this.subscriptionName = subscriptionName;
        this.session = session;
        this.dedicatedListenerThread = session.isDedicatedListenerThread();
        this.useServerSideFiltering = session.getFactory().isUseServerSideFiltering();
        this.destination = destination;
        this.subscriptionMode = destination.isQueue() ? SubscriptionMode.Durable : subscriptionMode;
        this.subscriptionType = destination.isQueue() ? SubscriptionType.Shared : subscriptionType;
        this.selectorSupport = SelectorSupport.build(selector, subscriptionType == SubscriptionType.Exclusive || session.getFactory().isEnableClientSideEmulation() || session.getFactory().isUseServerSideFiltering());
        this.unregisterSubscriptionOnClose = unregisterSubscriptionOnClose;
        if (noLocal && subscriptionType != SubscriptionType.Exclusive && !session.getFactory().isEnableClientSideEmulation()) {
            throw new IllegalStateException("noLocal is not enabled by default with subscriptionType " + (Object)((Object)subscriptionType) + ", please set jms.enableClientSideEmulation=true");
        }
    }

    public PulsarMessageConsumer subscribe() throws JMSException {
        if (this.destination.isQueue()) {
            this.session.getFactory().ensureQueueSubscription(this.destination);
        } else {
            this.getConsumer();
        }
        this.session.registerConsumer(this);
        return this;
    }

    synchronized ConsumerBase<?> getConsumer() throws JMSException {
        if (this.closed.get()) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (this.consumer == null) {
            String currentSelector = this.internalGetMessageSelector();
            this.consumer = this.session.getFactory().createConsumer(this.destination, this.subscriptionName, this.subscriptionMode, this.subscriptionType, currentSelector, this.noLocal, this.session);
        }
        return this.consumer;
    }

    boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public String getMessageSelector() throws JMSException {
        this.checkNotClosed();
        String selector = this.internalGetMessageSelector();
        String selectorOnSubscription = this.internalGetMessageSelectorFromSubscription();
        if (selectorOnSubscription == null) {
            return selector;
        }
        if (selector == null) {
            return selectorOnSubscription;
        }
        return "(" + selectorOnSubscription + ") AND (" + selector + ")";
    }

    private synchronized String internalGetMessageSelector() {
        return this.selectorSupport != null ? this.selectorSupport.getSelector() : null;
    }

    private synchronized String internalGetMessageSelectorFromSubscription() {
        if (!this.useServerSideFiltering || this.selectorSupportOnSubscriptions.isEmpty()) {
            return null;
        }
        SelectorSupport next = this.selectorSupportOnSubscriptions.values().iterator().next();
        return next != null ? next.getSelector() : null;
    }

    @Override
    public synchronized MessageListener getMessageListener() throws JMSException {
        this.checkNotClosed();
        return this.listener;
    }

    void checkNotClosed() throws JMSException {
        this.session.checkNotClosed();
        if (this.closed.get() || this.requestClose.get()) {
            throw new IllegalStateException("This consumer is closed");
        }
    }

    @Override
    public synchronized void setMessageListener(MessageListener listener) throws JMSException {
        this.checkNotClosed();
        this.listener = listener;
        this.session.ensureListenerThread(this);
    }

    @Override
    public javax.jms.Message receive() throws JMSException {
        return this.receiveWithTimeoutAndValidateType(Long.MAX_VALUE, null);
    }

    @Override
    public javax.jms.Message receive(long timeout) throws JMSException {
        return this.receiveWithTimeoutAndValidateType(timeout, null);
    }

    public List<javax.jms.Message> batchReceive(int maxMessages, long timeoutMs) throws JMSException {
        javax.jms.Message message;
        if (this.closed.get()) {
            return Collections.emptyList();
        }
        this.getConsumer();
        if (maxMessages < 0) {
            maxMessages = 1;
        }
        if ((message = this.receive(timeoutMs)) == null) {
            return Collections.emptyList();
        }
        ArrayList<javax.jms.Message> result = new ArrayList<javax.jms.Message>(maxMessages);
        result.add(message);
        while (this.hasSomePrefetchedMessages() && result.size() < maxMessages && (message = this.receiveNoWait()) != null) {
            result.add(message);
        }
        return result;
    }

    public boolean hasSomePrefetchedMessages() {
        return this.consumer.getTotalIncomingMessages() > 0;
    }

    synchronized javax.jms.Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType) throws JMSException {
        this.checkNotClosed();
        if (this.listener != null) {
            throw new IllegalStateException("cannot receive if you have a messageListener");
        }
        int acquireConnectionStartTime = timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int)timeout;
        int stepTimeout = timeout < 100L ? (int)timeout : 100;
        long start = System.currentTimeMillis();
        return this.session.executeOperationIfConnectionStarted(() -> {
            do {
                javax.jms.Message result;
                if ((result = (javax.jms.Message)this.session.executeCriticalOperation(() -> {
                    try {
                        ConsumerBase<?> consumer = this.getConsumer();
                        Message message = consumer.receive(stepTimeout, TimeUnit.MILLISECONDS);
                        if (message == null) {
                            return null;
                        }
                        return this.handleReceivedMessage(message, consumer, expectedType, null, this.noLocal);
                    }
                    catch (Exception err) {
                        throw Utils.handleException(err);
                    }
                })) == null) continue;
                return result;
            } while (System.currentTimeMillis() - start < timeout && !this.session.isClosed());
            return null;
        }, acquireConnectionStartTime);
    }

    @Override
    public javax.jms.Message receiveNoWait() throws JMSException {
        return this.receive(1L);
    }

    private void skipMessage(Message<?> message) throws JMSException {
        this.skippedMessages.incrementAndGet();
        if (this.subscriptionType == SubscriptionType.Exclusive || this.session.getFactory().isAcknowledgeRejectedMessages()) {
            if (this.session.getTransaction() != null) {
                this.consumer.acknowledgeAsync(message.getMessageId(), this.session.getTransaction());
            } else {
                this.consumer.acknowledgeAsync(message.getMessageId());
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("nAck filtered msg {}", (Object)message.getMessageId());
            }
            this.consumer.negativeAcknowledge(message);
        }
    }

    private PulsarMessage handleReceivedMessage(Message<?> message, Consumer<?> consumer, Class expectedType, java.util.function.Consumer<PulsarMessage> listenerCode, boolean noLocalFilter) throws JMSException, PulsarClientException {
        String senderConnectionID;
        this.receivedMessages.incrementAndGet();
        PulsarMessage result = PulsarMessage.decode(this, consumer, message);
        if (expectedType != null && !result.isBodyAssignableTo(expectedType)) {
            if (log.isDebugEnabled()) {
                log.debug("negativeAcknowledge for message {} that cannot be converted to {}", (Object)message, (Object)expectedType);
            }
            consumer.negativeAcknowledge(message);
            throw new MessageFormatException("The message (" + result.messageType() + "," + result + ",) cannot be converted to a " + expectedType);
        }
        SelectorSupport selectorSupportOnSubscription = this.getSelectorSupportOnSubscription(message.getTopicName());
        if (selectorSupportOnSubscription != null && this.requiresClientSideFiltering(message) && !selectorSupportOnSubscription.matches(result)) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} does not match subscription selector {}", (Object)result, (Object)selectorSupportOnSubscription.getSelector());
            }
            this.skippedMessages.incrementAndGet();
            consumer.acknowledgeAsync(message.getMessageId());
            return null;
        }
        SelectorSupport selectorSupport = this.getSelectorSupport();
        if (selectorSupport != null && this.requiresClientSideFiltering(message) && !selectorSupport.matches(result)) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} does not match selector {}", (Object)result, (Object)selectorSupport.getSelector());
            }
            this.skipMessage(message);
            return null;
        }
        if (noLocalFilter && (senderConnectionID = result.getStringProperty("JMSConnectionID")) != null && senderConnectionID.equals(this.session.getConnection().getConnectionId())) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} was generated from this connection {}", (Object)result, (Object)senderConnectionID);
            }
            this.skipMessage(message);
            return null;
        }
        if (result.getJMSExpiration() > 0L && System.currentTimeMillis() >= result.getJMSExpiration()) {
            if (log.isDebugEnabled()) {
                log.debug("msg {} expired at {}", (Object)result, (Object)Instant.ofEpochMilli(result.getJMSExpiration()));
            }
            this.skipMessage(message);
            return null;
        }
        this.session.registerUnacknowledgedMessage(result);
        if (listenerCode != null) {
            try {
                listenerCode.accept(result);
            }
            catch (Throwable t) {
                log.error("Listener thrown error, calling negativeAcknowledge", t);
                consumer.negativeAcknowledge(message);
                throw Utils.handleException(t);
            }
            if (result.isNegativeAcked()) {
                return null;
            }
        }
        if (this.session.getTransacted()) {
            this.session.getTransaction();
        } else if (this.session.getAcknowledgeMode() == 1) {
            consumer.acknowledge(message);
        } else if (this.session.getAcknowledgeMode() == 3) {
            consumer.acknowledgeAsync(message).whenComplete((m3, ex) -> {
                if (ex != null) {
                    log.error("Cannot acknowledge message {} {}", (Object)message, ex);
                }
            });
        }
        if (this.session.getAcknowledgeMode() != 2 && this.session.getAcknowledgeMode() != 4 && this.session.getAcknowledgeMode() != 0) {
            this.session.unregisterUnacknowledgedMessage(result);
        }
        if (this.requestClose.get()) {
            this.closeInternal();
        }
        return result;
    }

    private boolean requiresClientSideFiltering(Message<?> message) {
        boolean isBatch = message.getMessageId() instanceof BatchMessageIdImpl;
        return isBatch || !this.session.getFactory().isUseServerSideFiltering();
    }

    @Override
    public void close() throws JMSException {
        if (Utils.isOnMessageListener(this.session, this)) {
            this.requestClose.set(true);
            return;
        }
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        Consumer<?> consumer = this.getInternalConsumer();
        if (consumer == null) {
            return;
        }
        if (!this.session.isTransactionStarted()) {
            this.session.executeCriticalOperation(() -> {
                try {
                    consumer.close();
                    this.session.removeConsumer(this);
                    return null;
                }
                catch (Exception err) {
                    throw Utils.handleException(err);
                }
            });
        } else if (this.session.getTransacted()) {
            this.closedWhileActiveTransaction.set(true);
        }
    }

    public String toString() {
        return "PulsarConsumer{subscriptionName=" + this.subscriptionName + ", destination=" + this.destination + '}';
    }

    @Override
    public synchronized Topic getTopic() throws JMSException {
        this.checkNotClosed();
        if (this.destination.isTopic()) {
            return (Topic)((Object)this.destination);
        }
        throw new JMSException("This consumer has been created on a Queue");
    }

    @Override
    public synchronized Queue getQueue() throws JMSException {
        this.checkNotClosed();
        if (this.destination.isQueue()) {
            return (Queue)((Object)this.destination);
        }
        throw new JMSException("This consumer has been created on a Topic");
    }

    @Override
    public synchronized boolean getNoLocal() throws JMSException {
        this.checkNotClosed();
        return this.noLocal;
    }

    public JMSConsumer asJMSConsumer() {
        return new PulsarJMSConsumer(this);
    }

    void acknowledge(Message<?> receivedPulsarMessage, PulsarMessage message, Consumer<?> consumer) throws JMSException {
        try {
            consumer.acknowledge(receivedPulsarMessage);
            this.session.unregisterUnacknowledgedMessage(message);
        }
        catch (PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    void runListenerNoWait() {
        this.runListener(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runListener(int timeout) {
        if (this.closed.get()) {
            return;
        }
        boolean executeAgain = true;
        boolean someMessageFound = false;
        PulsarMessageConsumer pulsarMessageConsumer = this;
        synchronized (pulsarMessageConsumer) {
            MessageListener messageListener = this.listener;
            if (messageListener == null) {
                return;
            }
            while (executeAgain) {
                boolean messageFound = Utils.executeMessageListenerInSessionContext(this.session, this, () -> {
                    try {
                        ConsumerBase<?> consumer = this.getConsumer();
                        if (timeout == 0 && consumer.getTotalIncomingMessages() <= 0) {
                            return false;
                        }
                        Message<?> message = consumer.receive(timeout, TimeUnit.MILLISECONDS);
                        if (message == null) {
                            return false;
                        }
                        this.handleReceivedMessage(message, consumer, null, messageListener::onMessage, this.noLocal);
                        return true;
                    }
                    catch (PulsarClientException.AlreadyClosedException closed) {
                        log.error("Error while receiving message on Closed consumer {}", (Object)this);
                    }
                    catch (PulsarClientException | JMSException err) {
                        log.error("Error while receiving message on consumer {}", (Object)this, (Object)err);
                        this.session.onError(err);
                    }
                    return false;
                });
                if (!this.dedicatedListenerThread && messageFound) {
                    executeAgain = true;
                    someMessageFound = true;
                    continue;
                }
                executeAgain = false;
            }
        }
        if (!this.dedicatedListenerThread && !this.closed.get()) {
            this.session.scheduleConsumerListenerCycle(this, someMessageFound);
        }
    }

    void closeDuringRollback() throws JMSException {
        try {
            this.consumer.close();
            this.session.removeConsumer(this);
        }
        catch (Exception err) {
            throw Utils.handleException(err);
        }
    }

    public void closeInternal() throws JMSException {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.requestClose.set(false);
        try {
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }
        catch (Exception err) {
            throw Utils.handleException(err);
        }
    }

    public boolean isClosedWhileActiveTransaction() {
        return this.closedWhileActiveTransaction.get();
    }

    public void negativeAck(Message<?> message) {
        if (this.consumer != null) {
            this.consumer.negativeAcknowledge(message);
        }
    }

    void redeliverUnacknowledgedMessages() {
        if (this.consumer != null) {
            this.consumer.redeliverUnacknowledgedMessages();
        }
    }

    PulsarSession getSession() {
        return this.session;
    }

    private synchronized SelectorSupport getSelectorSupport() {
        return this.selectorSupport;
    }

    public synchronized SelectorSupport getSelectorSupportOnSubscription(String topicName) throws JMSException {
        if (!this.useServerSideFiltering) {
            return null;
        }
        if (!this.selectorSupportOnSubscriptions.containsKey(topicName)) {
            String subscriptionName = this.destination.isQueue() ? this.session.getFactory().getQueueSubscriptionName(this.destination) : this.subscriptionName;
            String selector = this.session.getFactory().downloadServerSideFilter(topicName, subscriptionName, this.subscriptionMode);
            this.selectorSupportOnSubscriptions.put(topicName, SelectorSupport.build(selector, true));
        }
        return this.selectorSupportOnSubscriptions.get(topicName);
    }

    synchronized Consumer<?> getInternalConsumer() {
        return this.consumer;
    }

    PulsarDestination getDestination() {
        return this.destination;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public long getReceivedMessages() {
        return this.receivedMessages.get();
    }

    public long getSkippedMessages() {
        return this.skippedMessages.get();
    }

    synchronized void refreshServerSideSelectors() {
        int size = this.selectorSupportOnSubscriptions.size();
        if (size > 0) {
            this.selectorSupportOnSubscriptions.clear();
            log.info("Refreshing {} server-side filters on {}", (Object)size, (Object)this.destination);
        }
    }
}

