/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.ConsumerConfiguration;
import com.datastax.oss.pulsar.jms.PulsarConnection;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarMessage;
import com.datastax.oss.pulsar.jms.PulsarMessageConsumer;
import com.datastax.oss.pulsar.jms.PulsarMessageProducer;
import com.datastax.oss.pulsar.jms.PulsarQueue;
import com.datastax.oss.pulsar.jms.PulsarQueueBrowser;
import com.datastax.oss.pulsar.jms.PulsarTopic;
import com.datastax.oss.pulsar.jms.Utils;
import com.datastax.oss.pulsar.jms.messages.PulsarBytesMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarMapMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarObjectMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarSimpleMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarStreamMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Consumer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerInterceptor;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Message;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.MessageId;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Producer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionMode;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionType;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.transaction.Transaction;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSession
implements Session,
QueueSession,
TopicSession {
    private static final Logger log = LoggerFactory.getLogger(PulsarSession.class);
    private static final AtomicLong STICKY_KEY_GENERATOR = new AtomicLong();
    private final PulsarConnection connection;
    private boolean jms20;
    private final ConsumerConfiguration overrideConsumerConfiguration;
    private final int sessionMode;
    private final boolean transacted;
    private final boolean emulateTransactions;
    private final boolean enableJMSPriority;
    private boolean allowQueueOperations = true;
    private boolean allowTopicOperations = true;
    Transaction transaction;
    private MessageListener messageListener;
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final List<PulsarMessage> unackedMessages = new ArrayList<PulsarMessage>();
    private final Map<String, PulsarDestination> destinationBySubscription = new HashMap<String, PulsarDestination>();
    private volatile boolean closed;
    private final boolean useDedicatedListenerThread;
    private volatile ListenerThread dedicatedListenerThread;
    private volatile ScheduledExecutorService threadPool;
    private volatile Future<?> listenersExecutorsCycleHandle;
    private final List<PulsarMessageConsumer> consumers = new CopyOnWriteArrayList<PulsarMessageConsumer>();
    private final List<PulsarQueueBrowser> browsers = new CopyOnWriteArrayList<PulsarQueueBrowser>();
    private final ReentrantLock pendingActivitiesLock = new ReentrantLock();
    private final Condition pendingActivitiesLockCanCommit = this.pendingActivitiesLock.newCondition();
    private final Condition pendingActivitiesLockCanDoActivity = this.pendingActivitiesLock.newCondition();
    private int activitesBlockingTransactionOperations = 0;
    private boolean transactionOperationInProgress = false;
    private final AtomicLong transactionStickyKey = new AtomicLong();
    private final ConsumersInterceptor consumerInterceptor = new ConsumersInterceptor();
    private final List<javax.jms.Message> connectionConsumerTasks = new ArrayList<javax.jms.Message>();
    private final AtomicReference<Runnable> connectionConsumerPostProcessingTask = new AtomicReference();

    static String ACKNOWLEDGE_MODE_TO_STRING(int mode) {
        switch (mode) {
            case 3: {
                return "DUPS_OK_ACKNOWLEDGE";
            }
            case 1: {
                return "AUTO_ACKNOWLEDGE";
            }
            case 2: {
                return "CLIENT_ACKNOWLEDGE";
            }
            case 0: {
                return "SESSION_TRANSACTED";
            }
            case 4: {
                return "INDIVIDUAL_ACKNOWLEDGE";
            }
        }
        return "?" + mode;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    PulsarSession(int sessionMode, PulsarConnection connection, ConsumerConfiguration overrideConsumerConfiguration) throws JMSException {
        if (sessionMode == 0 && !connection.getFactory().isEnableTransaction()) {
            if (!connection.getFactory().isEmulateTransactions()) throw new JMSException("Please enable transactions on PulsarConnectionFactory with enableTransaction=true, you can configure jms.emulateTransactions if your Pulsar cluster does not support transactions");
            this.emulateTransactions = true;
        } else {
            this.emulateTransactions = false;
        }
        this.jms20 = false;
        this.connection = connection;
        this.sessionMode = sessionMode;
        this.transacted = sessionMode == 0;
        this.overrideConsumerConfiguration = overrideConsumerConfiguration;
        PulsarConnectionFactory factory = this.getFactory();
        this.enableJMSPriority = factory.isEnableJMSPriority();
        boolean bl = this.useDedicatedListenerThread = factory.getSessionListenersThreads() <= 0;
        if (this.transacted && factory.isTransactionsStickyPartitions()) {
            this.generateNewTransactionStickyKey();
        }
        PulsarSession.validateSessionMode(sessionMode);
    }

    private synchronized ScheduledExecutorService getThreadPool() {
        if (this.threadPool == null) {
            this.threadPool = this.getFactory().getSessionListenersThreadPool();
        }
        return this.threadPool;
    }

    public PulsarSession createSession(int sessionMode, Map<String, Object> customConfiguration) throws JMSException {
        return this.connection.createSession(sessionMode == 0, sessionMode, ConsumerConfiguration.buildConsumerConfiguration(customConfiguration != null ? (Map)customConfiguration.get("consumerConfig") : null));
    }

    ConsumerConfiguration getOverrideConsumerConfiguration() {
        return this.overrideConsumerConfiguration;
    }

    Transaction getTransaction() throws JMSException {
        if (this.transaction == null && this.sessionMode == 0 && !this.emulateTransactions) {
            this.transaction = this.startTransaction(this.connection);
        }
        return this.transaction;
    }

    private Transaction startTransaction(PulsarConnection connection) throws JMSException {
        Transaction transaction = null;
        int createTransactionTrials = 10;
        while (createTransactionTrials-- > 0) {
            try {
                try {
                    transaction = connection.getFactory().getPulsarClient().newTransaction().build().get();
                    break;
                }
                catch (ExecutionException err) {
                    if (err.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException) {
                        log.info("Transaction service not available {}", (Object)err.getCause().getMessage());
                        Thread.sleep(1000L);
                        continue;
                    }
                    throw Utils.handleException(err.getCause());
                }
            }
            catch (Exception err) {
                throw Utils.handleException(err);
            }
        }
        if (transaction == null) {
            throw new JMSException("Cannot create a Transaction in time");
        }
        return transaction;
    }

    private static void validateSessionMode(int sessionMode) throws JMSException {
        switch (sessionMode) {
            case 0: 
            case 1: 
            case 2: 
            case 3: 
            case 4: {
                break;
            }
            default: {
                throw new JMSException("Invalid sessionMode " + sessionMode);
            }
        }
    }

    PulsarConnectionFactory getFactory() {
        return this.connection.getFactory();
    }

    Producer<byte[]> getProducerForDestination(Destination destination) throws JMSException {
        return this.getFactory().getProducerForDestination(destination, this.transacted);
    }

    @Override
    public PulsarBytesMessage createBytesMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarBytesMessage();
    }

    @Override
    public MapMessage createMapMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarMapMessage();
    }

    public MapMessage createMapMessage(Map<String, Object> body) throws JMSException {
        return new PulsarMapMessage(body);
    }

    @Override
    public javax.jms.Message createMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarSimpleMessage();
    }

    @Override
    public ObjectMessage createObjectMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarObjectMessage();
    }

    @Override
    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        this.checkNotClosed();
        PulsarObjectMessage res = new PulsarObjectMessage();
        res.setObject(object);
        return res;
    }

    @Override
    public StreamMessage createStreamMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarStreamMessage();
    }

    @Override
    public TextMessage createTextMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarTextMessage((String)null);
    }

    @Override
    public TextMessage createTextMessage(String text) throws JMSException {
        this.checkNotClosed();
        return new PulsarTextMessage(text);
    }

    @Override
    public boolean getTransacted() throws JMSException {
        this.checkNotClosed();
        return this.transacted;
    }

    @Override
    public int getAcknowledgeMode() throws JMSException {
        this.checkNotClosed();
        return this.sessionMode;
    }

    public long getTransactionStickyKey() {
        return this.transactionStickyKey.get();
    }

    private void generateNewTransactionStickyKey() {
        this.transactionStickyKey.set(STICKY_KEY_GENERATOR.incrementAndGet());
    }

    void blockTransactionOperations() throws JMSException {
        if (!this.transacted) {
            return;
        }
        this.pendingActivitiesLock.lock();
        try {
            while (this.transactionOperationInProgress) {
                this.pendingActivitiesLockCanDoActivity.await();
            }
            ++this.activitesBlockingTransactionOperations;
        }
        catch (InterruptedException err) {
            IllegalStateException e = new IllegalStateException("commit/rollback interrupted");
            e.initCause(err);
            throw e;
        }
        finally {
            this.pendingActivitiesLock.unlock();
        }
    }

    void unblockTransactionOperations() {
        if (!this.transacted) {
            return;
        }
        this.pendingActivitiesLock.lock();
        try {
            int newValue = --this.activitesBlockingTransactionOperations;
            if (newValue == 0) {
                this.pendingActivitiesLockCanCommit.signalAll();
            }
        }
        finally {
            this.pendingActivitiesLock.unlock();
        }
    }

    @SuppressFBWarnings(value={"UL_UNRELEASED_LOCK", "UL_UNRELEASED_LOCK_EXCEPTION_PATH"})
    void beginTransactionOperation() throws JMSException {
        this.pendingActivitiesLock.lock();
        try {
            while (this.transactionOperationInProgress) {
                this.pendingActivitiesLockCanDoActivity.await();
            }
            this.transactionOperationInProgress = true;
            while (this.activitesBlockingTransactionOperations > 0) {
                this.pendingActivitiesLockCanCommit.await();
            }
        }
        catch (InterruptedException err) {
            IllegalStateException e = new IllegalStateException("commit/rollback interrupted");
            e.initCause(err);
            throw e;
        }
        finally {
            this.pendingActivitiesLock.unlock();
        }
    }

    void endTransactionOperation() throws JMSException {
        this.pendingActivitiesLock.lock();
        try {
            if (!this.transactionOperationInProgress) {
                throw new IllegalStateException("commit/rollback already in not progress");
            }
            this.transactionOperationInProgress = false;
            this.pendingActivitiesLockCanDoActivity.signalAll();
        }
        finally {
            this.pendingActivitiesLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws JMSException {
        block16: {
            this.checkNotClosed();
            Utils.checkNotOnMessageProducer(this, null);
            if (!this.transacted) {
                throw new IllegalStateException("session is not transacted");
            }
            this.closeLock.readLock().lock();
            try {
                this.beginTransactionOperation();
                try {
                    if (this.emulateTransactions) {
                        List<PulsarMessage> list = this.unackedMessages;
                        synchronized (list) {
                            for (PulsarMessage msg : new ArrayList<PulsarMessage>(this.unackedMessages)) {
                                msg.acknowledgeInternal();
                            }
                            this.unackedMessages.clear();
                        }
                    }
                    if (this.transaction == null) break block16;
                    ArrayList handles = new ArrayList();
                    List<PulsarMessage> list = this.unackedMessages;
                    synchronized (list) {
                        for (PulsarMessage msg : new ArrayList<PulsarMessage>(this.unackedMessages)) {
                            handles.add(msg.acknowledgeInternalInTransaction(this.transaction));
                        }
                        this.unackedMessages.clear();
                    }
                    handles.add(this.transaction.commit());
                    Utils.get(CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])));
                    this.transaction = null;
                    this.generateNewTransactionStickyKey();
                }
                finally {
                    this.endTransactionOperation();
                }
            }
            finally {
                this.closeLock.readLock().unlock();
            }
        }
    }

    @Override
    public void rollback() throws JMSException {
        this.checkNotClosed();
        Utils.checkNotOnMessageProducer(this, null);
        this.closeLock.readLock().lock();
        try {
            this.beginTransactionOperation();
            try {
                if (!this.transacted) {
                    throw new IllegalStateException("session is not transacted");
                }
                this.rollbackInternal();
            }
            finally {
                this.endTransactionOperation();
            }
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollbackInternal() throws JMSException {
        for (PulsarMessageConsumer consumer : this.consumers) {
            consumer.redeliverUnacknowledgedMessages();
            if (!consumer.isClosedWhileActiveTransaction()) continue;
            consumer.closeDuringRollback();
        }
        List<PulsarMessage> list = this.unackedMessages;
        synchronized (list) {
            this.unackedMessages.clear();
        }
        if (this.transaction != null) {
            Utils.get(this.transaction.abort());
        }
        this.transaction = null;
        this.generateNewTransactionStickyKey();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws JMSException {
        Utils.checkNotOnSessionCallback(this);
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.transacted && this.transaction != null) {
                this.rollbackInternal();
            }
            Iterator<PulsarQueueBrowser> iterator = this.unackedMessages;
            synchronized (iterator) {
                this.unackedMessages.clear();
            }
            for (PulsarMessageConsumer consumer : this.consumers) {
                consumer.closeInternal();
            }
            this.consumers.clear();
            for (PulsarQueueBrowser browser : this.browsers) {
                browser.close();
            }
            this.browsers.clear();
            if (this.listenersExecutorsCycleHandle != null) {
                this.listenersExecutorsCycleHandle.cancel(false);
                this.listenersExecutorsCycleHandle = null;
            }
        }
        finally {
            this.closeLock.writeLock().unlock();
            this.connection.unregisterSession(this);
        }
        if (this.dedicatedListenerThread != null) {
            try {
                this.dedicatedListenerThread.join();
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                this.dedicatedListenerThread = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recover() throws JMSException {
        this.checkNotClosed();
        if (this.transacted) {
            throw new IllegalStateException("cannot call this method inside a transacted session");
        }
        List<PulsarMessage> list = this.unackedMessages;
        synchronized (list) {
            for (PulsarMessage msg : this.unackedMessages) {
                msg.negativeAck();
            }
            this.unackedMessages.clear();
        }
    }

    @Override
    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    @Override
    public void setMessageListener(MessageListener listener) throws JMSException {
        Objects.requireNonNull(listener);
        this.messageListener = listener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        List<javax.jms.Message> list = this.connectionConsumerTasks;
        synchronized (list) {
            try {
                for (javax.jms.Message foreignMessage : this.connectionConsumerTasks) {
                    try {
                        this.messageListener.onMessage(foreignMessage);
                        foreignMessage.acknowledge();
                    }
                    catch (Throwable err) {
                        Utils.handleException(err);
                        log.info("Error in ConsumerConnection task on message {}", (Object)foreignMessage, (Object)err);
                        ((PulsarMessage)foreignMessage).negativeAck();
                    }
                }
            }
            finally {
                this.connectionConsumerTasks.clear();
                Runnable task = this.connectionConsumerPostProcessingTask.getAndSet(null);
                if (task != null) {
                    task.run();
                }
            }
        }
    }

    private void runListenersLoop() {
        if (this.consumers.isEmpty() || !this.connection.isStarted()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return;
        }
        for (PulsarMessageConsumer consumer : this.consumers) {
            try {
                this.connection.executeInConnectionPausedLock(() -> {
                    consumer.runListener(100);
                    return null;
                }, 0);
            }
            catch (Throwable err) {
                log.error("Error in Session Thread {}", (Object)this, (Object)err);
            }
            if (this.connection.isStarted()) continue;
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setupConnectionConsumerTask(List<javax.jms.Message> foreignMessages, Runnable postExecutionTask) {
        List<javax.jms.Message> list = this.connectionConsumerTasks;
        synchronized (list) {
            this.connectionConsumerTasks.addAll(foreignMessages);
            this.connectionConsumerPostProcessingTask.set(postExecutionTask);
        }
    }

    @Override
    public PulsarMessageProducer createProducer(Destination destination) throws JMSException {
        this.connection.setAllowSetClientId(false);
        return new PulsarMessageProducer(this, destination);
    }

    @Override
    public PulsarMessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, null);
    }

    @Override
    public PulsarMessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return this.createConsumer(destination, messageSelector, false);
    }

    @Override
    public PulsarMessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        PulsarDestination pulsarDestination = PulsarConnectionFactory.toPulsarDestination(destination);
        return this.buildConsumer(UUID.randomUUID().toString(), pulsarDestination, SubscriptionMode.NonDurable, this.getFactory().getExclusiveSubscriptionTypeForSimpleConsumers(destination), messageSelector, false, noLocal);
    }

    private PulsarMessageConsumer buildConsumer(String subscriptionName, PulsarDestination destination, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String selector, boolean unregisterSubscriptionOnClose, boolean noLocal) throws JMSException {
        return new PulsarMessageConsumer(subscriptionName, destination, this, subscriptionMode, subscriptionType, selector, unregisterSubscriptionOnClose, noLocal).subscribe();
    }

    @Override
    public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
        return this.createSharedConsumer(topic, sharedSubscriptionName, null);
    }

    @Override
    public PulsarMessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        this.checkTopicOperationEnabled();
        sharedSubscriptionName = this.connection.prependClientId(sharedSubscriptionName, true);
        PulsarDestination pulsarDestination = PulsarConnectionFactory.toPulsarDestination(topic);
        this.registerSubscriptionName(pulsarDestination, sharedSubscriptionName, true);
        return this.buildConsumer(sharedSubscriptionName, pulsarDestination, SubscriptionMode.NonDurable, this.getFactory().getTopicSharedSubscriptionType(), messageSelector, true, false);
    }

    @Override
    public PulsarQueue createQueue(String queueName) throws JMSException {
        this.checkNotClosed();
        this.checkQueueOperationEnabled();
        return new PulsarQueue(queueName);
    }

    @Override
    public PulsarTopic createTopic(String topicName) throws JMSException {
        this.checkNotClosed();
        this.checkTopicOperationEnabled();
        return new PulsarTopic(topicName);
    }

    @Override
    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        return this.createDurableSubscriber(topic, name, null, false);
    }

    @Override
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        return this.createDurableSubscriber(topic, name, messageSelector, noLocal, false);
    }

    private PulsarMessageConsumer createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal, boolean allowUnsetClientId) throws JMSException {
        this.checkTopicOperationEnabled();
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        PulsarTopic pulsarTopic = (PulsarTopic)PulsarConnectionFactory.toPulsarDestination(topic);
        name = this.connection.prependClientId(name, allowUnsetClientId);
        this.registerSubscriptionName(pulsarTopic, name, false);
        return this.buildConsumer(name, pulsarTopic, SubscriptionMode.Durable, SubscriptionType.Exclusive, messageSelector, true, noLocal);
    }

    private void registerSubscriptionName(PulsarDestination topic, String name, boolean shared) throws JMSException {
        PulsarDestination alreadyExists = this.destinationBySubscription.put(name, topic);
        if (alreadyExists != null && alreadyExists.equals(topic) && !shared) {
            throw new IllegalStateException("a subscription with name " + name + " is already registered on this session");
        }
    }

    private void unregisterSubscriptionName(String name, Topic topic) {
        PulsarDestination existing = this.destinationBySubscription.get(name);
        if (existing != null && existing.equals(topic)) {
            this.destinationBySubscription.remove(name);
        }
    }

    @Override
    public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
        return this.createDurableConsumer(topic, name, null, false);
    }

    @Override
    public PulsarMessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        return this.createDurableSubscriber(topic, name, messageSelector, noLocal, false);
    }

    @Override
    public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
        return this.createSharedDurableConsumer(topic, name, null);
    }

    @Override
    public PulsarMessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        this.checkTopicOperationEnabled();
        PulsarTopic pulsarTopic = (PulsarTopic)PulsarConnectionFactory.toPulsarDestination(topic);
        name = this.connection.prependClientId(name, true);
        this.registerSubscriptionName(pulsarTopic, name, true);
        return this.buildConsumer(name, pulsarTopic, SubscriptionMode.Durable, this.getFactory().getTopicSharedSubscriptionType(), messageSelector, true, false);
    }

    @Override
    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        return this.createBrowser(queue, null);
    }

    @Override
    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        if (queue == null) {
            throw new InvalidDestinationException("invalid null queue");
        }
        this.checkQueueOperationEnabled();
        queue = (Queue)((Object)PulsarConnectionFactory.toPulsarDestination(queue));
        PulsarQueueBrowser res = new PulsarQueueBrowser(this, queue, messageSelector);
        this.browsers.add(res);
        return res;
    }

    @Override
    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkNotClosed();
        this.checkQueueOperationEnabled();
        return this.connection.createTemporaryQueue(this);
    }

    @Override
    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkNotClosed();
        this.checkTopicOperationEnabled();
        return this.connection.createTemporaryTopic(this);
    }

    @Override
    public void unsubscribe(String name) throws JMSException {
        boolean someThingDone;
        this.checkNotClosed();
        this.checkTopicOperationEnabled();
        name = this.connection.prependClientId(name, true);
        PulsarDestination destination = this.destinationBySubscription.remove(name);
        if (destination == null) {
            log.error("Cannot unsubscribe {}, please open and close the subscription within this session before unsubscribing, because in Pulsar you need to known the Destination for the subscription. Known subscription names {}", (Object)name, (Object)this.destinationBySubscription);
        }
        if (!(someThingDone = this.getFactory().deleteSubscription(destination, name))) {
            throw new InvalidDestinationException("Subscription " + name + " not found");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void acknowledgeAllMessages() throws JMSException {
        this.checkNotClosed();
        List<PulsarMessage> list = this.unackedMessages;
        synchronized (list) {
            for (PulsarMessage unackedMessage : new ArrayList<PulsarMessage>(this.unackedMessages)) {
                unackedMessage.acknowledgeInternal();
            }
            this.unackedMessages.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void registerUnacknowledgedMessage(PulsarMessage result) {
        List<PulsarMessage> list = this.unackedMessages;
        synchronized (list) {
            this.unackedMessages.add(result);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unregisterUnacknowledgedMessage(PulsarMessage result) {
        List<PulsarMessage> list = this.unackedMessages;
        synchronized (list) {
            this.unackedMessages.remove(result);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeConsumer(PulsarMessageConsumer consumer) {
        Consumer<?> pulsarConsumer = consumer.getInternalConsumer();
        if (pulsarConsumer != null) {
            this.consumers.remove(consumer);
            this.getFactory().removeConsumer(pulsarConsumer);
            List<PulsarMessage> list = this.unackedMessages;
            synchronized (list) {
                Iterator<PulsarMessage> it = this.unackedMessages.iterator();
                while (it.hasNext()) {
                    PulsarMessage message = it.next();
                    if (message == null || !message.isReceivedFromConsumer(consumer)) continue;
                    it.remove();
                }
            }
        }
        if (consumer.unregisterSubscriptionOnClose) {
            this.unregisterSubscriptionName(consumer.subscriptionName, (Topic)((Object)consumer.getDestination()));
        }
    }

    public void onError(Throwable err) {
        log.error("Internal error ", err);
    }

    public void registerConsumer(PulsarMessageConsumer consumer) {
        this.consumers.add(consumer);
        this.connection.setAllowSetClientId(false);
    }

    public boolean isJms20() {
        return this.jms20;
    }

    public void setJms20(boolean jms20) {
        this.jms20 = jms20;
    }

    public PulsarConnection getConnection() {
        return this.connection;
    }

    void removeBrowser(PulsarQueueBrowser pulsarQueueBrowser) {
        this.browsers.remove(pulsarQueueBrowser);
    }

    public boolean isTransactionStarted() {
        return this.transaction != null;
    }

    <T> T executeOperationIfConnectionStarted(BlockCLoseOperation<T> operation, int timeoutMillis) throws JMSException {
        this.checkNotClosed();
        return (T)this.connection.executeInConnectionPausedLock(operation::execute, timeoutMillis);
    }

    <T> T executeCriticalOperation(BlockCLoseOperation<T> operation) throws JMSException {
        this.checkNotClosed();
        this.closeLock.readLock().lock();
        try {
            this.checkNotClosed();
            T t = operation.execute();
            return t;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override
    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return this.createConsumer(queue);
    }

    @Override
    public QueueReceiver createReceiver(Queue queue, String s2) throws JMSException {
        return this.createConsumer(queue, s2);
    }

    @Override
    public QueueSender createSender(Queue queue) throws JMSException {
        return this.createProducer(queue);
    }

    @Override
    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return this.createConsumer(topic);
    }

    @Override
    public TopicSubscriber createSubscriber(Topic topic, String s2, boolean b) throws JMSException {
        return this.createConsumer(topic, s2, b);
    }

    @Override
    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        return this.createProducer(topic);
    }

    public void checkNotClosed() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        this.connection.checkNotClosed();
    }

    public boolean isClosed() {
        this.closeLock.readLock().lock();
        try {
            boolean bl = this.closed;
            return bl;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    void startDedicatedListenerThread() {
        if (this.dedicatedListenerThread != null) {
            return;
        }
        this.dedicatedListenerThread = new ListenerThread();
        this.dedicatedListenerThread.start();
    }

    boolean isDedicatedListenerThread() {
        return this.useDedicatedListenerThread;
    }

    void scheduleConsumerListenerCycle(PulsarMessageConsumer consumer, boolean immediate) {
        if (this.isClosed()) {
            return;
        }
        ScheduledExecutorService threadPool = this.getThreadPool();
        if (!this.connection.isStarted()) {
            threadPool.schedule(() -> this.scheduleConsumerListenerCycle(consumer, true), 100L, TimeUnit.MILLISECONDS);
        } else if (immediate) {
            threadPool.submit(consumer::runListenerNoWait);
        } else {
            threadPool.schedule(consumer::runListenerNoWait, 100L, TimeUnit.MILLISECONDS);
        }
    }

    void ensureListenerThread(PulsarMessageConsumer consumer) {
        if (this.isClosed()) {
            return;
        }
        if (this.isDedicatedListenerThread()) {
            this.startDedicatedListenerThread();
        } else {
            this.scheduleConsumerListenerCycle(consumer, true);
        }
    }

    void checkQueueOperationEnabled() throws JMSException {
        if (!this.allowQueueOperations) {
            throw new IllegalStateException("This is not a QueueSession");
        }
    }

    void checkTopicOperationEnabled() throws JMSException {
        if (!this.allowTopicOperations) {
            throw new IllegalStateException("This is not a TopicSession");
        }
    }

    PulsarSession emulateLegacySession(boolean queue, boolean topic) {
        this.allowQueueOperations = queue;
        this.allowTopicOperations = topic;
        return this;
    }

    ConsumerInterceptor getConsumerInterceptor() {
        return this.consumerInterceptor;
    }

    void refreshServerSideSelectors() {
        this.consumers.forEach(c -> c.refreshServerSideSelectors());
    }

    class ConsumersInterceptor
    implements ConsumerInterceptor<Object> {
        ConsumersInterceptor() {
        }

        @Override
        public void close() {
        }

        @Override
        public Message<Object> beforeConsume(Consumer<Object> consumer, Message<Object> message) {
            return message;
        }

        @Override
        public void onAcknowledge(Consumer<Object> consumer, MessageId messageId, Throwable exception) {
        }

        @Override
        public void onAcknowledgeCumulative(Consumer<Object> consumer, MessageId messageId, Throwable exception) {
        }

        @Override
        public void onNegativeAcksSend(Consumer<Object> consumer, Set<MessageId> messageIds) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onAckTimeoutSend(Consumer<Object> consumer, Set<MessageId> messageIds) {
            if (log.isDebugEnabled()) {
                log.debug("onAckTimeoutSend {}", (Object)messageIds);
            }
            List list = PulsarSession.this.unackedMessages;
            synchronized (list) {
                Iterator it = PulsarSession.this.unackedMessages.iterator();
                block3: while (it.hasNext()) {
                    PulsarMessage msg = (PulsarMessage)it.next();
                    MessageId messageId = msg.getReceivedPulsarMessage().getMessageId();
                    for (MessageId id : messageIds) {
                        boolean found = Utils.sameEntryId(id, messageId);
                        if (!found) continue;
                        it.remove();
                        continue block3;
                    }
                }
            }
        }

        @Override
        public void onPartitionsChange(String topicName, int partitions) {
        }
    }

    private class ListenerThread
    extends Thread {
        private ListenerThread() {
            super("jms-session-thread");
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (!PulsarSession.this.closed) {
                PulsarSession.this.runListenersLoop();
            }
        }
    }

    static interface BlockCLoseOperation<T> {
        public T execute() throws JMSException;
    }
}

