package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.messages.PulsarBytesMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarMapMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarObjectMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarSimpleMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarStreamMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Consumer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.MessageId;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Producer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionMode;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionType;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.transaction.Transaction;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession.class */
public class PulsarSession implements Session, QueueSession, TopicSession {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarSession.class);
    private static final AtomicLong STICKY_KEY_GENERATOR = new AtomicLong();
    private final PulsarConnection connection;
    private boolean jms20;
    private final ConsumerConfiguration overrideConsumerConfiguration;
    private final int sessionMode;
    private final boolean transacted;
    private final boolean emulateTransactions;
    private final boolean enableJMSPriority;
    Transaction transaction;
    private MessageListener messageListener;
    private volatile boolean closed;
    private final boolean useDedicatedListenerThread;
    private volatile ListenerThread dedicatedListenerThread;
    private volatile ScheduledExecutorService threadPool;
    private volatile Future<?> listenersExecutorsCycleHandle;
    private boolean allowQueueOperations = true;
    private boolean allowTopicOperations = true;
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final List<PulsarMessage> unackedMessages = new ArrayList();
    private final Map<String, PulsarDestination> destinationBySubscription = new HashMap();
    private final List<PulsarMessageConsumer> consumers = new CopyOnWriteArrayList();
    private final List<PulsarQueueBrowser> browsers = new CopyOnWriteArrayList();
    private final ReentrantLock pendingActivitiesLock = new ReentrantLock();
    private final Condition pendingActivitiesLockCanCommit = this.pendingActivitiesLock.newCondition();
    private final Condition pendingActivitiesLockCanDoActivity = this.pendingActivitiesLock.newCondition();
    private int activitesBlockingTransactionOperations = 0;
    private boolean transactionOperationInProgress = false;
    private final AtomicLong transactionStickyKey = new AtomicLong();
    private final ConsumersInterceptor consumerInterceptor = new ConsumersInterceptor();
    private final List<Message> connectionConsumerTasks = new ArrayList();
    private final AtomicReference<Runnable> connectionConsumerPostProcessingTask = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession$BlockCLoseOperation.class */
    public interface BlockCLoseOperation<T> {
        T execute() throws JMSException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession$ConsumersInterceptor.class */
    public class ConsumersInterceptor implements ConsumerInterceptor<Object> {
        ConsumersInterceptor() {
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor, java.lang.AutoCloseable
        public void close() {
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Message<Object> beforeConsume(Consumer<Object> consumer, com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Message<Object> message) {
            return message;
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public void onAcknowledge(Consumer<Object> consumer, MessageId messageId, Throwable th) {
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public void onAcknowledgeCumulative(Consumer<Object> consumer, MessageId messageId, Throwable th) {
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public void onNegativeAcksSend(Consumer<Object> consumer, Set<MessageId> set) {
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public void onAckTimeoutSend(Consumer<Object> consumer, Set<MessageId> set) {
            if (PulsarSession.log.isDebugEnabled()) {
                PulsarSession.log.debug("onAckTimeoutSend {}", set);
            }
            synchronized (PulsarSession.this.unackedMessages) {
                Iterator it = PulsarSession.this.unackedMessages.iterator();
                while (it.hasNext()) {
                    MessageId messageId = ((PulsarMessage) it.next()).getReceivedPulsarMessage().getMessageId();
                    Iterator<MessageId> it2 = set.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        } else if (Utils.sameEntryId(it2.next(), messageId)) {
                            it.remove();
                            break;
                        }
                    }
                }
            }
        }

        @Override // com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor
        public void onPartitionsChange(String str, int i) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession$ListenerThread.class */
    public class ListenerThread extends Thread {
        private ListenerThread() {
            super("jms-session-thread");
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!PulsarSession.this.closed) {
                PulsarSession.this.runListenersLoop();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String ACKNOWLEDGE_MODE_TO_STRING(int i) {
        switch (i) {
            case 0:
                return "SESSION_TRANSACTED";
            case 1:
                return "AUTO_ACKNOWLEDGE";
            case 2:
                return "CLIENT_ACKNOWLEDGE";
            case 3:
                return "DUPS_OK_ACKNOWLEDGE";
            case 4:
                return "INDIVIDUAL_ACKNOWLEDGE";
            default:
                return "?" + i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession(int i, PulsarConnection pulsarConnection, ConsumerConfiguration consumerConfiguration) throws JMSException {
        if (i != 0 || pulsarConnection.getFactory().isEnableTransaction()) {
            this.emulateTransactions = false;
        } else {
            if (!pulsarConnection.getFactory().isEmulateTransactions()) {
                throw new JMSException("Please enable transactions on PulsarConnectionFactory with enableTransaction=true, you can configure jms.emulateTransactions if your Pulsar cluster does not support transactions");
            }
            this.emulateTransactions = true;
        }
        this.jms20 = false;
        this.connection = pulsarConnection;
        this.sessionMode = i;
        this.transacted = i == 0;
        this.overrideConsumerConfiguration = consumerConfiguration;
        PulsarConnectionFactory factory = getFactory();
        this.enableJMSPriority = factory.isEnableJMSPriority();
        this.useDedicatedListenerThread = factory.getSessionListenersThreads() <= 0;
        if (this.transacted && factory.isTransactionsStickyPartitions()) {
            generateNewTransactionStickyKey();
        }
        validateSessionMode(i);
    }

    private synchronized ScheduledExecutorService getThreadPool() {
        if (this.threadPool == null) {
            this.threadPool = getFactory().getSessionListenersThreadPool();
        }
        return this.threadPool;
    }

    public PulsarSession createSession(int i, Map<String, Object> map) throws JMSException {
        return this.connection.createSession(i == 0, i, ConsumerConfiguration.buildConsumerConfiguration(map != null ? (Map) map.get("consumerConfig") : null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerConfiguration getOverrideConsumerConfiguration() {
        return this.overrideConsumerConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Transaction getTransaction() throws JMSException {
        if (this.transaction == null && this.sessionMode == 0 && !this.emulateTransactions) {
            this.transaction = startTransaction(this.connection);
        }
        return this.transaction;
    }

    private Transaction startTransaction(PulsarConnection pulsarConnection) throws JMSException {
        Transaction transaction = null;
        int i = 10;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                break;
            }
            try {
                try {
                    transaction = pulsarConnection.getFactory().getPulsarClient().newTransaction().build().get();
                    break;
                } catch (ExecutionException e) {
                    if (!(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException)) {
                        throw Utils.handleException(e.getCause());
                    }
                    log.info("Transaction service not available {}", e.getCause().getMessage());
                    Thread.sleep(1000L);
                }
            } catch (Exception e2) {
                throw Utils.handleException(e2);
            }
        }
        if (transaction == null) {
            throw new JMSException("Cannot create a Transaction in time");
        }
        return transaction;
    }

    private static void validateSessionMode(int i) throws JMSException {
        switch (i) {
            case 0:
            case 1:
            case 2:
            case 3:
            case 4:
                return;
            default:
                throw new JMSException("Invalid sessionMode " + i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarConnectionFactory getFactory() {
        return this.connection.getFactory();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Producer<byte[]> getProducerForDestination(Destination destination) throws JMSException {
        return getFactory().getProducerForDestination(destination, this.transacted);
    }

    @Override // javax.jms.Session
    public PulsarBytesMessage createBytesMessage() throws JMSException {
        checkNotClosed();
        return new PulsarBytesMessage();
    }

    @Override // javax.jms.Session
    public MapMessage createMapMessage() throws JMSException {
        checkNotClosed();
        return new PulsarMapMessage();
    }

    public MapMessage createMapMessage(Map<String, Object> map) throws JMSException {
        return new PulsarMapMessage(map);
    }

    @Override // javax.jms.Session
    public Message createMessage() throws JMSException {
        checkNotClosed();
        return new PulsarSimpleMessage();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage() throws JMSException {
        checkNotClosed();
        return new PulsarObjectMessage();
    }

    @Override // javax.jms.Session
    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
        checkNotClosed();
        PulsarObjectMessage pulsarObjectMessage = new PulsarObjectMessage();
        pulsarObjectMessage.setObject(serializable);
        return pulsarObjectMessage;
    }

    @Override // javax.jms.Session
    public StreamMessage createStreamMessage() throws JMSException {
        checkNotClosed();
        return new PulsarStreamMessage();
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage() throws JMSException {
        checkNotClosed();
        return new PulsarTextMessage((String) null);
    }

    @Override // javax.jms.Session
    public TextMessage createTextMessage(String str) throws JMSException {
        checkNotClosed();
        return new PulsarTextMessage(str);
    }

    @Override // javax.jms.Session
    public boolean getTransacted() throws JMSException {
        checkNotClosed();
        return this.transacted;
    }

    @Override // javax.jms.Session
    public int getAcknowledgeMode() throws JMSException {
        checkNotClosed();
        return this.sessionMode;
    }

    public long getTransactionStickyKey() {
        return this.transactionStickyKey.get();
    }

    private void generateNewTransactionStickyKey() {
        this.transactionStickyKey.set(STICKY_KEY_GENERATOR.incrementAndGet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void blockTransactionOperations() throws JMSException {
        if (this.transacted) {
            this.pendingActivitiesLock.lock();
            while (this.transactionOperationInProgress) {
                try {
                    try {
                        this.pendingActivitiesLockCanDoActivity.await();
                    } catch (InterruptedException e) {
                        IllegalStateException illegalStateException = new IllegalStateException("commit/rollback interrupted");
                        illegalStateException.initCause(e);
                        throw illegalStateException;
                    }
                } catch (Throwable th) {
                    this.pendingActivitiesLock.unlock();
                    throw th;
                }
            }
            this.activitesBlockingTransactionOperations++;
            this.pendingActivitiesLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unblockTransactionOperations() {
        if (this.transacted) {
            this.pendingActivitiesLock.lock();
            try {
                int i = this.activitesBlockingTransactionOperations - 1;
                this.activitesBlockingTransactionOperations = i;
                if (i == 0) {
                    this.pendingActivitiesLockCanCommit.signalAll();
                }
            } finally {
                this.pendingActivitiesLock.unlock();
            }
        }
    }

    @SuppressFBWarnings({"UL_UNRELEASED_LOCK", "UL_UNRELEASED_LOCK_EXCEPTION_PATH"})
    void beginTransactionOperation() throws JMSException {
        this.pendingActivitiesLock.lock();
        while (this.transactionOperationInProgress) {
            try {
                try {
                    this.pendingActivitiesLockCanDoActivity.await();
                } catch (InterruptedException e) {
                    IllegalStateException illegalStateException = new IllegalStateException("commit/rollback interrupted");
                    illegalStateException.initCause(e);
                    throw illegalStateException;
                }
            } finally {
                this.pendingActivitiesLock.unlock();
            }
        }
        this.transactionOperationInProgress = true;
        while (this.activitesBlockingTransactionOperations > 0) {
            this.pendingActivitiesLockCanCommit.await();
        }
    }

    void endTransactionOperation() throws JMSException {
        this.pendingActivitiesLock.lock();
        try {
            if (!this.transactionOperationInProgress) {
                throw new IllegalStateException("commit/rollback already in not progress");
            }
            this.transactionOperationInProgress = false;
            this.pendingActivitiesLockCanDoActivity.signalAll();
        } finally {
            this.pendingActivitiesLock.unlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // javax.jms.Session
    public void commit() throws JMSException {
        checkNotClosed();
        Utils.checkNotOnMessageProducer(this, null);
        if (!this.transacted) {
            throw new IllegalStateException("session is not transacted");
        }
        this.closeLock.readLock().lock();
        try {
            beginTransactionOperation();
            try {
                if (this.emulateTransactions) {
                    synchronized (this.unackedMessages) {
                        Iterator it = new ArrayList(this.unackedMessages).iterator();
                        while (it.hasNext()) {
                            ((PulsarMessage) it.next()).acknowledgeInternal();
                        }
                        this.unackedMessages.clear();
                    }
                }
                if (this.transaction != null) {
                    ArrayList arrayList = new ArrayList();
                    synchronized (this.unackedMessages) {
                        Iterator it2 = new ArrayList(this.unackedMessages).iterator();
                        while (it2.hasNext()) {
                            arrayList.add(((PulsarMessage) it2.next()).acknowledgeInternalInTransaction(this.transaction));
                        }
                        this.unackedMessages.clear();
                    }
                    arrayList.add(this.transaction.commit());
                    Utils.get(CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])));
                    this.transaction = null;
                    generateNewTransactionStickyKey();
                }
                endTransactionOperation();
            } catch (Throwable th) {
                endTransactionOperation();
                throw th;
            }
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // javax.jms.Session
    public void rollback() throws JMSException {
        checkNotClosed();
        Utils.checkNotOnMessageProducer(this, null);
        this.closeLock.readLock().lock();
        try {
            beginTransactionOperation();
            try {
                if (!this.transacted) {
                    throw new IllegalStateException("session is not transacted");
                }
                rollbackInternal();
                endTransactionOperation();
            } catch (Throwable th) {
                endTransactionOperation();
                throw th;
            }
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    private void rollbackInternal() throws JMSException {
        for (PulsarMessageConsumer pulsarMessageConsumer : this.consumers) {
            pulsarMessageConsumer.redeliverUnacknowledgedMessages();
            if (pulsarMessageConsumer.isClosedWhileActiveTransaction()) {
                pulsarMessageConsumer.closeDuringRollback();
            }
        }
        synchronized (this.unackedMessages) {
            this.unackedMessages.clear();
        }
        if (this.transaction != null) {
            Utils.get(this.transaction.abort());
        }
        this.transaction = null;
        generateNewTransactionStickyKey();
    }

    @Override // javax.jms.Session, java.lang.AutoCloseable
    public void close() throws JMSException {
        Utils.checkNotOnSessionCallback(this);
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.transacted && this.transaction != null) {
                rollbackInternal();
            }
            synchronized (this.unackedMessages) {
                this.unackedMessages.clear();
            }
            Iterator<PulsarMessageConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().closeInternal();
            }
            this.consumers.clear();
            Iterator<PulsarQueueBrowser> it2 = this.browsers.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            this.browsers.clear();
            if (this.listenersExecutorsCycleHandle != null) {
                this.listenersExecutorsCycleHandle.cancel(false);
                this.listenersExecutorsCycleHandle = null;
            }
            if (this.dedicatedListenerThread != null) {
                try {
                    this.dedicatedListenerThread.join();
                    this.dedicatedListenerThread = null;
                } catch (InterruptedException e) {
                    this.dedicatedListenerThread = null;
                } catch (Throwable th) {
                    this.dedicatedListenerThread = null;
                    throw th;
                }
            }
        } finally {
            this.closeLock.writeLock().unlock();
            this.connection.unregisterSession(this);
        }
    }

    @Override // javax.jms.Session
    public void recover() throws JMSException {
        checkNotClosed();
        if (this.transacted) {
            throw new IllegalStateException("cannot call this method inside a transacted session");
        }
        synchronized (this.unackedMessages) {
            Iterator<PulsarMessage> it = this.unackedMessages.iterator();
            while (it.hasNext()) {
                it.next().negativeAck();
            }
            this.unackedMessages.clear();
        }
    }

    @Override // javax.jms.Session
    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    @Override // javax.jms.Session
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        Objects.requireNonNull(messageListener);
        this.messageListener = messageListener;
    }

    @Override // javax.jms.Session, java.lang.Runnable
    public void run() {
        synchronized (this.connectionConsumerTasks) {
            try {
                for (Message message : this.connectionConsumerTasks) {
                    try {
                        this.messageListener.onMessage(message);
                        message.acknowledge();
                    } catch (Throwable th) {
                        Utils.handleException(th);
                        log.info("Error in ConsumerConnection task on message {}", message, th);
                        ((PulsarMessage) message).negativeAck();
                    }
                }
            } finally {
                this.connectionConsumerTasks.clear();
                Runnable andSet = this.connectionConsumerPostProcessingTask.getAndSet(null);
                if (andSet != null) {
                    andSet.run();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runListenersLoop() {
        if (this.consumers.isEmpty() || !this.connection.isStarted()) {
            try {
                Thread.sleep(100L);
                return;
            } catch (InterruptedException e) {
                return;
            }
        }
        for (PulsarMessageConsumer pulsarMessageConsumer : this.consumers) {
            try {
                this.connection.executeInConnectionPausedLock(() -> {
                    pulsarMessageConsumer.runListener(100);
                    return null;
                }, 0);
            } catch (Throwable th) {
                log.error("Error in Session Thread {}", this, th);
            }
            if (!this.connection.isStarted()) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupConnectionConsumerTask(List<Message> list, Runnable runnable) {
        synchronized (this.connectionConsumerTasks) {
            this.connectionConsumerTasks.addAll(list);
            this.connectionConsumerPostProcessingTask.set(runnable);
        }
    }

    @Override // javax.jms.Session
    public PulsarMessageProducer createProducer(Destination destination) throws JMSException {
        this.connection.setAllowSetClientId(false);
        return new PulsarMessageProducer(this, destination);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createConsumer(Destination destination) throws JMSException {
        return createConsumer(destination, (String) null);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createConsumer(Destination destination, String str) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return createConsumer(destination, str, false);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createConsumer(Destination destination, String str, boolean z) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return buildConsumer(UUID.randomUUID().toString(), PulsarConnectionFactory.toPulsarDestination(destination), SubscriptionMode.NonDurable, getFactory().getExclusiveSubscriptionTypeForSimpleConsumers(destination), str, false, z);
    }

    private PulsarMessageConsumer buildConsumer(String str, PulsarDestination pulsarDestination, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String str2, boolean z, boolean z2) throws JMSException {
        return new PulsarMessageConsumer(str, pulsarDestination, this, subscriptionMode, subscriptionType, str2, z, z2).subscribe();
    }

    @Override // javax.jms.Session
    public MessageConsumer createSharedConsumer(Topic topic, String str) throws JMSException {
        return createSharedConsumer(topic, str, (String) null);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createSharedConsumer(Topic topic, String str, String str2) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        checkTopicOperationEnabled();
        String prependClientId = this.connection.prependClientId(str, true);
        PulsarDestination pulsarDestination = PulsarConnectionFactory.toPulsarDestination(topic);
        registerSubscriptionName(pulsarDestination, prependClientId, true);
        return buildConsumer(prependClientId, pulsarDestination, SubscriptionMode.NonDurable, getFactory().getTopicSharedSubscriptionType(), str2, true, false);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public PulsarQueue createQueue(String str) throws JMSException {
        checkNotClosed();
        checkQueueOperationEnabled();
        return new PulsarQueue(str);
    }

    @Override // javax.jms.Session, javax.jms.TopicSession
    public PulsarTopic createTopic(String str) throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        return new PulsarTopic(str);
    }

    @Override // javax.jms.Session, javax.jms.TopicSession
    public TopicSubscriber createDurableSubscriber(Topic topic, String str) throws JMSException {
        return createDurableSubscriber(topic, str, null, false);
    }

    @Override // javax.jms.Session, javax.jms.TopicSession
    public TopicSubscriber createDurableSubscriber(Topic topic, String str, String str2, boolean z) throws JMSException {
        return createDurableSubscriber(topic, str, str2, z, false);
    }

    private PulsarMessageConsumer createDurableSubscriber(Topic topic, String str, String str2, boolean z, boolean z2) throws JMSException {
        checkTopicOperationEnabled();
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        PulsarTopic pulsarTopic = (PulsarTopic) PulsarConnectionFactory.toPulsarDestination(topic);
        String prependClientId = this.connection.prependClientId(str, z2);
        registerSubscriptionName(pulsarTopic, prependClientId, false);
        return buildConsumer(prependClientId, pulsarTopic, SubscriptionMode.Durable, SubscriptionType.Exclusive, str2, true, z);
    }

    private void registerSubscriptionName(PulsarDestination pulsarDestination, String str, boolean z) throws JMSException {
        PulsarDestination put = this.destinationBySubscription.put(str, pulsarDestination);
        if (put != null && put.equals(pulsarDestination) && !z) {
            throw new IllegalStateException("a subscription with name " + str + " is already registered on this session");
        }
    }

    private void unregisterSubscriptionName(String str, Topic topic) {
        PulsarDestination pulsarDestination = this.destinationBySubscription.get(str);
        if (pulsarDestination == null || !pulsarDestination.equals(topic)) {
            return;
        }
        this.destinationBySubscription.remove(str);
    }

    @Override // javax.jms.Session
    public MessageConsumer createDurableConsumer(Topic topic, String str) throws JMSException {
        return createDurableConsumer(topic, str, (String) null, false);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createDurableConsumer(Topic topic, String str, String str2, boolean z) throws JMSException {
        return createDurableSubscriber(topic, str, str2, z, false);
    }

    @Override // javax.jms.Session
    public MessageConsumer createSharedDurableConsumer(Topic topic, String str) throws JMSException {
        return createSharedDurableConsumer(topic, str, (String) null);
    }

    @Override // javax.jms.Session
    public PulsarMessageConsumer createSharedDurableConsumer(Topic topic, String str, String str2) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        checkTopicOperationEnabled();
        PulsarTopic pulsarTopic = (PulsarTopic) PulsarConnectionFactory.toPulsarDestination(topic);
        String prependClientId = this.connection.prependClientId(str, true);
        registerSubscriptionName(pulsarTopic, prependClientId, true);
        return buildConsumer(prependClientId, pulsarTopic, SubscriptionMode.Durable, getFactory().getTopicSharedSubscriptionType(), str2, true, false);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        return createBrowser(queue, null);
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public QueueBrowser createBrowser(Queue queue, String str) throws JMSException {
        if (queue == null) {
            throw new InvalidDestinationException("invalid null queue");
        }
        checkQueueOperationEnabled();
        PulsarQueueBrowser pulsarQueueBrowser = new PulsarQueueBrowser(this, (Queue) PulsarConnectionFactory.toPulsarDestination(queue), str);
        this.browsers.add(pulsarQueueBrowser);
        return pulsarQueueBrowser;
    }

    @Override // javax.jms.Session, javax.jms.QueueSession
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        checkNotClosed();
        checkQueueOperationEnabled();
        return this.connection.createTemporaryQueue(this);
    }

    @Override // javax.jms.Session, javax.jms.TopicSession
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        return this.connection.createTemporaryTopic(this);
    }

    @Override // javax.jms.Session, javax.jms.TopicSession
    public void unsubscribe(String str) throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        String prependClientId = this.connection.prependClientId(str, true);
        PulsarDestination remove = this.destinationBySubscription.remove(prependClientId);
        if (remove == null) {
            log.error("Cannot unsubscribe {}, please open and close the subscription within this session before unsubscribing, because in Pulsar you need to known the Destination for the subscription. Known subscription names {}", prependClientId, this.destinationBySubscription);
        }
        if (!getFactory().deleteSubscription(remove, prependClientId)) {
            throw new InvalidDestinationException("Subscription " + prependClientId + " not found");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeAllMessages() throws JMSException {
        checkNotClosed();
        synchronized (this.unackedMessages) {
            Iterator it = new ArrayList(this.unackedMessages).iterator();
            while (it.hasNext()) {
                ((PulsarMessage) it.next()).acknowledgeInternal();
            }
            this.unackedMessages.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerUnacknowledgedMessage(PulsarMessage pulsarMessage) {
        synchronized (this.unackedMessages) {
            this.unackedMessages.add(pulsarMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterUnacknowledgedMessage(PulsarMessage pulsarMessage) {
        synchronized (this.unackedMessages) {
            this.unackedMessages.remove(pulsarMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(PulsarMessageConsumer pulsarMessageConsumer) {
        Consumer<?> internalConsumer = pulsarMessageConsumer.getInternalConsumer();
        if (internalConsumer != null) {
            this.consumers.remove(pulsarMessageConsumer);
            getFactory().removeConsumer(internalConsumer);
            synchronized (this.unackedMessages) {
                Iterator<PulsarMessage> it = this.unackedMessages.iterator();
                while (it.hasNext()) {
                    PulsarMessage next = it.next();
                    if (next != null && next.isReceivedFromConsumer(pulsarMessageConsumer)) {
                        it.remove();
                    }
                }
            }
        }
        if (pulsarMessageConsumer.unregisterSubscriptionOnClose) {
            unregisterSubscriptionName(pulsarMessageConsumer.subscriptionName, (Topic) pulsarMessageConsumer.getDestination());
        }
    }

    public void onError(Throwable th) {
        log.error("Internal error ", th);
    }

    public void registerConsumer(PulsarMessageConsumer pulsarMessageConsumer) {
        this.consumers.add(pulsarMessageConsumer);
        this.connection.setAllowSetClientId(false);
    }

    public boolean isJms20() {
        return this.jms20;
    }

    public void setJms20(boolean z) {
        this.jms20 = z;
    }

    public PulsarConnection getConnection() {
        return this.connection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeBrowser(PulsarQueueBrowser pulsarQueueBrowser) {
        this.browsers.remove(pulsarQueueBrowser);
    }

    public boolean isTransactionStarted() {
        return this.transaction != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> T executeOperationIfConnectionStarted(BlockCLoseOperation<T> blockCLoseOperation, int i) throws JMSException {
        checkNotClosed();
        PulsarConnection pulsarConnection = this.connection;
        Objects.requireNonNull(blockCLoseOperation);
        return (T) pulsarConnection.executeInConnectionPausedLock(blockCLoseOperation::execute, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> T executeCriticalOperation(BlockCLoseOperation<T> blockCLoseOperation) throws JMSException {
        checkNotClosed();
        this.closeLock.readLock().lock();
        try {
            checkNotClosed();
            return blockCLoseOperation.execute();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return createConsumer((Destination) queue);
    }

    @Override // javax.jms.QueueSession
    public QueueReceiver createReceiver(Queue queue, String str) throws JMSException {
        return createConsumer((Destination) queue, str);
    }

    @Override // javax.jms.QueueSession
    public QueueSender createSender(Queue queue) throws JMSException {
        return createProducer((Destination) queue);
    }

    @Override // javax.jms.TopicSession
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return createConsumer((Destination) topic);
    }

    @Override // javax.jms.TopicSession
    public TopicSubscriber createSubscriber(Topic topic, String str, boolean z) throws JMSException {
        return createConsumer((Destination) topic, str, z);
    }

    @Override // javax.jms.TopicSession
    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        return createProducer((Destination) topic);
    }

    public void checkNotClosed() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        this.connection.checkNotClosed();
    }

    public boolean isClosed() {
        this.closeLock.readLock().lock();
        try {
            return this.closed;
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    void startDedicatedListenerThread() {
        if (this.dedicatedListenerThread != null) {
            return;
        }
        this.dedicatedListenerThread = new ListenerThread();
        this.dedicatedListenerThread.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isDedicatedListenerThread() {
        return this.useDedicatedListenerThread;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleConsumerListenerCycle(PulsarMessageConsumer pulsarMessageConsumer, boolean z) {
        if (isClosed()) {
            return;
        }
        ScheduledExecutorService threadPool = getThreadPool();
        if (!this.connection.isStarted()) {
            threadPool.schedule(() -> {
                scheduleConsumerListenerCycle(pulsarMessageConsumer, true);
            }, 100L, TimeUnit.MILLISECONDS);
        } else if (z) {
            Objects.requireNonNull(pulsarMessageConsumer);
            threadPool.submit(pulsarMessageConsumer::runListenerNoWait);
        } else {
            Objects.requireNonNull(pulsarMessageConsumer);
            threadPool.schedule(pulsarMessageConsumer::runListenerNoWait, 100L, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureListenerThread(PulsarMessageConsumer pulsarMessageConsumer) {
        if (isClosed()) {
            return;
        }
        if (isDedicatedListenerThread()) {
            startDedicatedListenerThread();
        } else {
            scheduleConsumerListenerCycle(pulsarMessageConsumer, true);
        }
    }

    void checkQueueOperationEnabled() throws JMSException {
        if (!this.allowQueueOperations) {
            throw new IllegalStateException("This is not a QueueSession");
        }
    }

    void checkTopicOperationEnabled() throws JMSException {
        if (!this.allowTopicOperations) {
            throw new IllegalStateException("This is not a TopicSession");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession emulateLegacySession(boolean z, boolean z2) {
        this.allowQueueOperations = z;
        this.allowTopicOperations = z2;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerInterceptor getConsumerInterceptor() {
        return this.consumerInterceptor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void refreshServerSideSelectors() {
        this.consumers.forEach(pulsarMessageConsumer -> {
            pulsarMessageConsumer.refreshServerSideSelectors();
        });
    }
}
