package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.selectors.SelectorSupport;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSConsumer;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarMessageConsumer.class */
public class PulsarMessageConsumer implements MessageConsumer, TopicSubscriber, QueueReceiver {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarMessageConsumer.class);
    final String subscriptionName;
    private final PulsarSession session;
    private final PulsarDestination destination;
    private final SelectorSupport selectorSupport;
    private final boolean noLocal;
    private Consumer<byte[]> consumer;
    private MessageListener listener;
    private final SubscriptionMode subscriptionMode;
    private final SubscriptionType subscriptionType;
    final boolean unregisterSubscriptionOnClose;
    private boolean closed;
    private boolean requestClose;

    public PulsarMessageConsumer(String str, PulsarDestination pulsarDestination, PulsarSession pulsarSession, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String str2, boolean z, boolean z2) throws JMSException {
        this.noLocal = z2;
        pulsarSession.checkNotClosed();
        if (pulsarDestination == null) {
            throw new InvalidDestinationException("Invalid destination");
        }
        if ((pulsarDestination instanceof PulsarTemporaryDestination) && ((PulsarTemporaryDestination) pulsarDestination).getSession() != pulsarSession) {
            throw new JMSException("Cannot subscribe to a temporary destination not created but this session");
        }
        this.subscriptionName = str;
        this.session = pulsarSession;
        this.destination = pulsarDestination;
        this.subscriptionMode = pulsarDestination.isQueue() ? SubscriptionMode.Durable : subscriptionMode;
        this.subscriptionType = pulsarDestination.isQueue() ? SubscriptionType.Shared : subscriptionType;
        this.selectorSupport = SelectorSupport.build(str2, subscriptionType == SubscriptionType.Exclusive || pulsarSession.getFactory().isEnableClientSideEmulation());
        this.unregisterSubscriptionOnClose = z;
        if (z2 && subscriptionType != SubscriptionType.Exclusive && !pulsarSession.getFactory().isEnableClientSideEmulation()) {
            throw new IllegalStateException("noLocal is not enabled by default with subscriptionType " + subscriptionType + ", please set jms.enableClientSideEmulation=true");
        }
    }

    public PulsarMessageConsumer subscribe() throws JMSException {
        if (this.destination.isQueue()) {
            this.session.getFactory().ensureQueueSubscription(this.destination);
        } else {
            getConsumer();
        }
        this.session.registerConsumer(this);
        return this;
    }

    synchronized Consumer<byte[]> getConsumer() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (this.consumer == null) {
            this.consumer = this.session.getFactory().createConsumer(this.destination, this.subscriptionName, this.session.getAcknowledgeMode(), this.subscriptionMode, this.subscriptionType);
        }
        return this.consumer;
    }

    @Override // javax.jms.MessageConsumer
    public synchronized String getMessageSelector() throws JMSException {
        checkNotClosed();
        if (this.selectorSupport != null) {
            return this.selectorSupport.getSelector();
        }
        return null;
    }

    @Override // javax.jms.MessageConsumer
    public synchronized MessageListener getMessageListener() throws JMSException {
        checkNotClosed();
        return this.listener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void checkNotClosed() throws JMSException {
        this.session.checkNotClosed();
        if (this.closed || this.requestClose) {
            throw new IllegalStateException("This consumer is closed");
        }
    }

    @Override // javax.jms.MessageConsumer
    public synchronized void setMessageListener(MessageListener messageListener) throws JMSException {
        checkNotClosed();
        this.listener = messageListener;
        this.session.ensureListenerThread();
    }

    @Override // javax.jms.MessageConsumer
    public Message receive() throws JMSException {
        return receiveWithTimeoutAndValidateType(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE, null);
    }

    @Override // javax.jms.MessageConsumer
    public Message receive(long j) throws JMSException {
        return receiveWithTimeoutAndValidateType(j, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Message receiveWithTimeoutAndValidateType(long j, Class cls) throws JMSException {
        checkNotClosed();
        if (this.listener != null) {
            throw new IllegalStateException("cannot receive if you have a messageListener");
        }
        int i = j == OpenWireFormat.DEFAULT_MAX_FRAME_SIZE ? Integer.MAX_VALUE : (int) j;
        int i2 = j < 100 ? (int) j : 100;
        long currentTimeMillis = System.currentTimeMillis();
        return (Message) this.session.executeOperationIfConnectionStarted(() -> {
            do {
                Message message = (Message) this.session.executeCriticalOperation(() -> {
                    try {
                        org.apache.pulsar.client.api.Message<byte[]> receive = getConsumer().receive(i2, TimeUnit.MILLISECONDS);
                        if (receive == null) {
                            return null;
                        }
                        return handleReceivedMessage(receive, cls, null, this.noLocal);
                    } catch (Exception e) {
                        throw Utils.handleException(e);
                    }
                });
                if (message != null) {
                    return message;
                }
                if (System.currentTimeMillis() - currentTimeMillis >= j) {
                    return null;
                }
            } while (!this.session.isClosed());
            return null;
        }, i);
    }

    @Override // javax.jms.MessageConsumer
    public Message receiveNoWait() throws JMSException {
        return receive(1L);
    }

    private void skipMessage(org.apache.pulsar.client.api.Message<byte[]> message) throws JMSException {
        if (this.subscriptionType != SubscriptionType.Exclusive && !this.session.getFactory().isAcknowledgeRejectedMessages()) {
            log.info("nAck filtered msg {}", message.getMessageId());
            this.consumer.negativeAcknowledge((org.apache.pulsar.client.api.Message<?>) message);
        } else if (this.session.getTransaction() != null) {
            this.consumer.acknowledgeAsync(message.getMessageId(), this.session.getTransaction());
        } else {
            this.consumer.acknowledgeAsync(message.getMessageId());
        }
    }

    private PulsarMessage handleReceivedMessage(org.apache.pulsar.client.api.Message<byte[]> message, Class cls, java.util.function.Consumer<PulsarMessage> consumer, boolean z) throws JMSException, PulsarClientException {
        String stringProperty;
        PulsarMessage decode = PulsarMessage.decode(this, message);
        Consumer<byte[]> consumer2 = getConsumer();
        if (cls != null && !decode.isBodyAssignableTo(cls)) {
            log.info("negativeAcknowledge for message {} that cannot be converted to {}", message, cls);
            consumer2.negativeAcknowledge((org.apache.pulsar.client.api.Message<?>) message);
            throw new MessageFormatException("The message (" + decode.messageType() + "," + decode + ",) cannot be converted to a " + cls);
        }
        if (this.selectorSupport != null && !this.selectorSupport.matches(decode)) {
            log.info("msg {} does not match selector {}", decode, this.selectorSupport.getSelector());
            skipMessage(message);
            return null;
        }
        if (z && (stringProperty = decode.getStringProperty("JMSConnectionID")) != null && stringProperty.equals(this.session.getConnection().getConnectionId())) {
            log.info("msg {} was generated from this connection {}", decode, stringProperty);
            skipMessage(message);
            return null;
        }
        if (decode.getJMSExpiration() > 0 && System.currentTimeMillis() >= decode.getJMSExpiration()) {
            log.info("msg {} expired at {}", decode, Instant.ofEpochMilli(decode.getJMSExpiration()));
            skipMessage(message);
            return null;
        }
        this.session.registerUnacknowledgedMessage(decode);
        if (consumer != null) {
            try {
                consumer.accept(decode);
                if (decode.isNegativeAcked()) {
                    return null;
                }
            } catch (Throwable th) {
                log.error("Listener thrown error, calling negativeAcknowledge", th);
                consumer2.negativeAcknowledge((org.apache.pulsar.client.api.Message<?>) message);
                throw Utils.handleException(th);
            }
        }
        if (this.session.getTransacted()) {
            Utils.get(consumer2.acknowledgeAsync(message.getMessageId(), this.session.getTransaction()));
        } else if (this.session.getAcknowledgeMode() == 1) {
            consumer2.acknowledge((org.apache.pulsar.client.api.Message<?>) message);
        } else if (this.session.getAcknowledgeMode() == 3) {
            consumer2.acknowledgeAsync((org.apache.pulsar.client.api.Message<?>) message).whenComplete((r6, th2) -> {
                if (th2 != null) {
                    log.error("Cannot acknowledge message {} {}", message, th2);
                }
            });
        }
        if (this.session.getAcknowledgeMode() != 2) {
            this.session.unregisterUnacknowledgedMessage(decode);
        }
        if (this.requestClose) {
            closeInternal();
        }
        return decode;
    }

    @Override // javax.jms.MessageConsumer, java.lang.AutoCloseable
    public synchronized void close() throws JMSException {
        if (Utils.isOnMessageListener(this.session, this)) {
            this.requestClose = true;
        } else {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.consumer == null) {
                return;
            }
            this.session.executeCriticalOperation(() -> {
                try {
                    this.consumer.close();
                    this.session.removeConsumer(this);
                    return null;
                } catch (Exception e) {
                    throw Utils.handleException(e);
                }
            });
        }
    }

    public String toString() {
        return "PulsarConsumer{subscriptionName=" + this.subscriptionName + ", destination=" + this.destination + '}';
    }

    @Override // javax.jms.TopicSubscriber
    public synchronized Topic getTopic() throws JMSException {
        checkNotClosed();
        if (this.destination.isTopic()) {
            return (Topic) this.destination;
        }
        throw new JMSException("This consumer has been created on a Queue");
    }

    @Override // javax.jms.QueueReceiver
    public synchronized Queue getQueue() throws JMSException {
        checkNotClosed();
        if (this.destination.isQueue()) {
            return (Queue) this.destination;
        }
        throw new JMSException("This consumer has been created on a Topic");
    }

    @Override // javax.jms.TopicSubscriber
    public synchronized boolean getNoLocal() throws JMSException {
        checkNotClosed();
        return this.noLocal;
    }

    public JMSConsumer asJMSConsumer() {
        return new JMSConsumer() { // from class: com.datastax.oss.pulsar.jms.PulsarMessageConsumer.1
            @Override // javax.jms.JMSConsumer
            public String getMessageSelector() {
                return (String) Utils.runtimeException(() -> {
                    return PulsarMessageConsumer.this.getMessageSelector();
                });
            }

            @Override // javax.jms.JMSConsumer
            public MessageListener getMessageListener() throws JMSRuntimeException {
                return (MessageListener) Utils.runtimeException(() -> {
                    return PulsarMessageConsumer.this.getMessageListener();
                });
            }

            @Override // javax.jms.JMSConsumer
            public void setMessageListener(MessageListener messageListener) throws JMSRuntimeException {
                Utils.runtimeException(() -> {
                    PulsarMessageConsumer.this.setMessageListener(messageListener);
                });
            }

            @Override // javax.jms.JMSConsumer
            public Message receive() {
                return (Message) Utils.runtimeException(() -> {
                    return PulsarMessageConsumer.this.receive();
                });
            }

            @Override // javax.jms.JMSConsumer
            public Message receive(long j) {
                return (Message) Utils.runtimeException(() -> {
                    return PulsarMessageConsumer.this.receive(j);
                });
            }

            @Override // javax.jms.JMSConsumer
            public Message receiveNoWait() {
                return (Message) Utils.runtimeException(() -> {
                    return PulsarMessageConsumer.this.receiveNoWait();
                });
            }

            @Override // javax.jms.JMSConsumer, java.lang.AutoCloseable
            public void close() {
                Utils.runtimeException(() -> {
                    PulsarMessageConsumer.this.close();
                });
            }

            @Override // javax.jms.JMSConsumer
            public <T> T receiveBody(Class<T> cls) {
                return (T) Utils.runtimeException(() -> {
                    Message receiveWithTimeoutAndValidateType = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE, cls);
                    if (receiveWithTimeoutAndValidateType == null) {
                        return null;
                    }
                    return receiveWithTimeoutAndValidateType.getBody(cls);
                });
            }

            @Override // javax.jms.JMSConsumer
            public <T> T receiveBody(Class<T> cls, long j) {
                return (T) Utils.runtimeException(() -> {
                    Message receiveWithTimeoutAndValidateType = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(j, cls);
                    if (receiveWithTimeoutAndValidateType == null) {
                        return null;
                    }
                    return receiveWithTimeoutAndValidateType.getBody(cls);
                });
            }

            @Override // javax.jms.JMSConsumer
            public <T> T receiveBodyNoWait(Class<T> cls) {
                return (T) Utils.runtimeException(() -> {
                    Message receiveWithTimeoutAndValidateType = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(1L, cls);
                    if (receiveWithTimeoutAndValidateType == null) {
                        return null;
                    }
                    return receiveWithTimeoutAndValidateType.getBody(cls);
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void acknowledge(org.apache.pulsar.client.api.Message<byte[]> message, PulsarMessage pulsarMessage) throws JMSException {
        try {
            getConsumer().acknowledge((org.apache.pulsar.client.api.Message<?>) message);
            this.session.unregisterUnacknowledgedMessage(pulsarMessage);
        } catch (PulsarClientException e) {
            throw Utils.handleException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void runListener(int i) {
        if (this.closed || this.listener == null) {
            return;
        }
        Utils.executeMessageListenerInSessionContext(this.session, this, () -> {
            if (this.closed) {
                return;
            }
            try {
                try {
                    org.apache.pulsar.client.api.Message<byte[]> receive = getConsumer().receive(i, TimeUnit.MILLISECONDS);
                    if (receive == null) {
                        return;
                    }
                    handleReceivedMessage(receive, null, pulsarMessage -> {
                        this.listener.onMessage(pulsarMessage);
                    }, this.noLocal);
                } catch (JMSException | PulsarClientException e) {
                    log.error("Error while receiving message con consumer {}", this, e);
                    this.session.onError(e);
                }
            } catch (PulsarClientException.AlreadyClosedException e2) {
                log.error("Error while receiving message con Closed consumer {}", this);
            }
        });
    }

    public void closeInternal() throws JMSException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.requestClose = false;
        try {
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        } catch (Exception e) {
            throw Utils.handleException(e);
        }
    }

    public void negativeAck(org.apache.pulsar.client.api.Message<byte[]> message) {
        if (this.consumer != null) {
            this.consumer.negativeAcknowledge((org.apache.pulsar.client.api.Message<?>) message);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession getSession() {
        return this.session;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Consumer<byte[]> getInternalConsumer() {
        return this.consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Destination getDestination() {
        return this.destination;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }
}
