/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.ConsumerConfiguration;
import com.datastax.oss.pulsar.jms.PulsarConnection;
import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarJMSAdminImpl;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import com.datastax.oss.pulsar.jms.PulsarQueue;
import com.datastax.oss.pulsar.jms.PulsarSession;
import com.datastax.oss.pulsar.jms.PulsarTopic;
import com.datastax.oss.pulsar.jms.TopicDiscoveryUtils;
import com.datastax.oss.pulsar.jms.Utils;
import com.datastax.oss.pulsar.jms.api.JMSAdmin;
import com.datastax.oss.pulsar.jms.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.admin.PulsarAdmin;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.admin.PulsarAdminException;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Authentication;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.AuthenticationFactory;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.BatcherBuilder;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ClientBuilder;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Consumer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ConsumerBuilder;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Message;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.MessageId;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.MessageRouter;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Producer;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ProducerBuilder;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.PulsarClient;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.PulsarClientException;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Reader;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.ReaderBuilder;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.Schema;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionInitialPosition;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionMode;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.SubscriptionType;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.TopicMetadata;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.ConsumerBase;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.PulsarClientImpl;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.auth.AuthenticationToken;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.util.MathUtils;
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.IllegalStateRuntimeException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarConnectionFactory
implements ConnectionFactory,
QueueConnectionFactory,
TopicConnectionFactory,
AutoCloseable,
Serializable {
    private static final Logger log = LoggerFactory.getLogger(PulsarConnectionFactory.class);
    private static final long serialVersionUID = 1231231L;
    private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
    private static final String SHADED_PREFIX = "com.datastax.oss.pulsar.jms.shaded.";
    private static final boolean NEEDS_RELOCATION = PulsarClient.class.getName().startsWith("com.datastax.oss.pulsar.jms.shaded.");
    private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet<String>();
    private final transient Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<String, Producer<byte[]>>();
    private final transient Set<PulsarConnection> connections = Collections.synchronizedSet(new HashSet());
    private final transient List<Consumer<?>> consumers = new CopyOnWriteArrayList();
    private final transient List<Reader<?>> readers = new CopyOnWriteArrayList();
    private transient PulsarClient pulsarClient;
    private transient PulsarAdmin pulsarAdmin;
    private transient Map<String, Object> producerConfiguration;
    private transient ConsumerConfiguration defaultConsumerConfiguration;
    private transient String systemNamespace = "public/default";
    private transient String defaultClientId = null;
    private transient boolean enableTransaction = false;
    private transient boolean emulateTransactions = false;
    private transient boolean enableClientSideEmulation = false;
    private transient boolean transactionsStickyPartitions = false;
    private transient boolean useServerSideFiltering = false;
    private transient boolean enableJMSPriority = false;
    private transient boolean priorityUseLinearMapping = true;
    private transient boolean forceDeleteTemporaryDestinations = false;
    private transient boolean useExclusiveSubscriptionsForSimpleConsumers = false;
    private transient boolean acknowledgeRejectedMessages = false;
    private transient String tckUsername = "";
    private transient String tckPassword = "";
    private transient boolean useCredentialsFromCreateConnection = false;
    private transient String lastConnectUsername = null;
    private transient String lastConnectPassword = null;
    private transient String queueSubscriptionName = "jms-queue";
    private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
    private transient long waitForServerStartupTimeout = 60000L;
    private transient boolean usePulsarAdmin = true;
    private transient boolean precreateQueueSubscription = true;
    private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
    private transient boolean initialized;
    private transient boolean closed;
    private transient int refreshServerSideFiltersPeriod = 300;
    private transient boolean maxMessagesLimitsParallelism = false;
    private transient int connectionConsumerStopTimeout = 20000;
    private transient Map<String, Object> configuration = Collections.emptyMap();
    private transient ScheduledExecutorService sessionListenersThreadPool;
    private transient int sessionListenersThreads;
    private transient int connectionConsumerParallelism = 1;

    public PulsarConnectionFactory() throws JMSException {
        this(new HashMap<String, Object>());
    }

    public PulsarConnectionFactory(Map<String, Object> properties) {
        this.setConfiguration(properties);
    }

    public PulsarConnectionFactory(String configuration) throws JMSException {
        this();
        this.setJsonConfiguration(configuration);
    }

    public String getJsonConfiguration() {
        return Utils.runtimeException(() -> new ObjectMapper().writeValueAsString(this.getConfiguration()));
    }

    public void setJsonConfiguration(String json) {
        if (json == null || json.isEmpty()) {
            this.setConfiguration(Collections.emptyMap());
            return;
        }
        this.setConfiguration(Utils.runtimeException(() -> new ObjectMapper().readValue(json, Map.class)));
    }

    public synchronized Map<String, Object> getConfiguration() {
        return Utils.deepCopyMap(this.configuration);
    }

    public synchronized void setConfiguration(Map<String, Object> configuration) {
        this.configuration = PulsarConnectionFactory.copyAndApplyShadedPrefix(configuration);
    }

    private static Map<String, Object> copyAndApplyShadedPrefix(Map<String, Object> configuration) {
        if (configuration == null) {
            return null;
        }
        if (!NEEDS_RELOCATION) {
            return new HashMap<String, Object>(configuration);
        }
        HashMap<String, Object> copy = new HashMap<String, Object>();
        configuration.forEach((key, value) -> {
            if (value instanceof Map) {
                copy.put((String)key, PulsarConnectionFactory.copyAndApplyShadedPrefix((Map)value));
                return;
            }
            if (value instanceof String) {
                String result = (String)value;
                if (result.length() > 17 && result.substring(1).startsWith("rg.apache.pulsar")) {
                    result = SHADED_PREFIX + value;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Relocating {} = {} -> {}", key, value, result);
                }
                copy.put((String)key, result);
                return;
            }
            copy.put((String)key, value);
        });
        return copy;
    }

    synchronized ConsumerConfiguration getConsumerConfiguration(ConsumerConfiguration overrideConsumerConfiguration, PulsarDestination destination) throws InvalidDestinationException {
        ConsumerConfiguration overriddenByDestination;
        ConsumerConfiguration result = this.defaultConsumerConfiguration;
        if (overrideConsumerConfiguration != null) {
            result = overrideConsumerConfiguration.applyDefaults(result);
        }
        if ((overriddenByDestination = Utils.computeConsumerOverrideConfiguration(destination)) != null) {
            result = overriddenByDestination.applyDefaults(result);
        }
        return result;
    }

    private synchronized Map<String, Object> getProducerConfiguration() {
        return this.producerConfiguration;
    }

    private synchronized void ensureInitialized(String connectUsername, String connectPassword) throws JMSException {
        if (this.initialized) {
            return;
        }
        if (this.closed) {
            throw new javax.jms.IllegalStateException("This ConnectionFactory is closed");
        }
        Map<String, Object> configurationCopy = Utils.deepCopyMap(this.configuration);
        try {
            String priorityMapping;
            Map producerConfiguration = (Map)configurationCopy.remove("producerConfig");
            if (producerConfiguration != null) {
                Object batcherBuilder = producerConfiguration.get("batcherBuilder");
                if (batcherBuilder != null && batcherBuilder instanceof String) {
                    String batcherBuilderString = (String)batcherBuilder;
                    BatcherBuilder builder = BatcherBuilder.DEFAULT;
                    switch (batcherBuilderString) {
                        case "KEY_BASED": {
                            builder = BatcherBuilder.KEY_BASED;
                            break;
                        }
                        case "DEFAULT": {
                            builder = BatcherBuilder.DEFAULT;
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException("Unsupported batcherBuilder " + batcherBuilderString);
                        }
                    }
                    producerConfiguration.put("batcherBuilder", builder);
                }
                this.producerConfiguration = new HashMap<String, Object>(producerConfiguration);
            } else {
                this.producerConfiguration = Collections.emptyMap();
            }
            Map consumerConfigurationM = (Map)configurationCopy.remove("consumerConfig");
            this.defaultConsumerConfiguration = ConsumerConfiguration.buildConsumerConfiguration(consumerConfigurationM);
            this.systemNamespace = Utils.getAndRemoveString("jms.systemNamespace", "public/default", configurationCopy);
            this.tckUsername = Utils.getAndRemoveString("jms.tckUsername", "", configurationCopy);
            this.tckPassword = Utils.getAndRemoveString("jms.tckPassword", "", configurationCopy);
            this.useCredentialsFromCreateConnection = Boolean.parseBoolean(Utils.getAndRemoveString("jms.useCredentialsFromCreateConnection", "false", configurationCopy));
            this.defaultClientId = Utils.getAndRemoveString("jms.clientId", null, configurationCopy);
            this.queueSubscriptionName = Utils.getAndRemoveString("jms.queueSubscriptionName", "jms-queue", configurationCopy);
            this.usePulsarAdmin = Boolean.parseBoolean(Utils.getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));
            this.precreateQueueSubscription = Boolean.parseBoolean(Utils.getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy));
            this.precreateQueueSubscriptionConsumerQueueSize = Integer.parseInt(Utils.getAndRemoveString("jms.precreateQueueSubscriptionConsumerQueueSize", "0", configurationCopy));
            this.refreshServerSideFiltersPeriod = Integer.parseInt(Utils.getAndRemoveString("jms.refreshServerSideFiltersPeriod", "300", configurationCopy));
            this.maxMessagesLimitsParallelism = Boolean.parseBoolean(Utils.getAndRemoveString("jms.maxMessagesLimitsParallelism", "false", configurationCopy));
            this.connectionConsumerStopTimeout = Integer.parseInt(Utils.getAndRemoveString("jms.connectionConsumerStopTimeout", "20000", configurationCopy));
            this.sessionListenersThreads = Integer.parseInt(Utils.getAndRemoveString("jms.sessionListenersThreads", Runtime.getRuntime().availableProcessors() * 2 + "", configurationCopy));
            String rawTopicSharedSubscriptionType = Utils.getAndRemoveString("jms.topicSharedSubscriptionType", SubscriptionType.Shared.name(), configurationCopy);
            this.topicSharedSubscriptionType = Stream.of(SubscriptionType.values()).filter(t -> t.name().equalsIgnoreCase(rawTopicSharedSubscriptionType) && t != SubscriptionType.Exclusive).findFirst().orElseThrow(() -> new IllegalArgumentException("Invalid jms.topicSubscriptionType: " + rawTopicSharedSubscriptionType + ", only " + (Object)((Object)SubscriptionType.Shared) + ", " + (Object)((Object)SubscriptionType.Key_Shared) + " and " + (Object)((Object)SubscriptionType.Failover) + " "));
            this.waitForServerStartupTimeout = Long.parseLong(Utils.getAndRemoveString("jms.waitForServerStartupTimeout", "60000", configurationCopy));
            this.enableClientSideEmulation = Boolean.parseBoolean(Utils.getAndRemoveString("jms.enableClientSideEmulation", "false", configurationCopy));
            this.transactionsStickyPartitions = Boolean.parseBoolean(Utils.getAndRemoveString("jms.transactionsStickyPartitions", "false", configurationCopy));
            this.useServerSideFiltering = Boolean.parseBoolean(Utils.getAndRemoveString("jms.useServerSideFiltering", "false", configurationCopy));
            this.enableJMSPriority = Boolean.parseBoolean(Utils.getAndRemoveString("jms.enableJMSPriority", "false", configurationCopy));
            switch (priorityMapping = Utils.getAndRemoveString("jms.priorityMapping", "linear", configurationCopy)) {
                case "linear": {
                    this.priorityUseLinearMapping = true;
                    break;
                }
                case "non-linear": {
                    this.priorityUseLinearMapping = false;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("jms.priorityMapping value '" + priorityMapping + "' is not valid, only 'linear' and 'non-linear'");
                }
            }
            this.useExclusiveSubscriptionsForSimpleConsumers = Boolean.parseBoolean(Utils.getAndRemoveString("jms.useExclusiveSubscriptionsForSimpleConsumers", "true", configurationCopy));
            this.acknowledgeRejectedMessages = Boolean.parseBoolean(Utils.getAndRemoveString("jms.acknowledgeRejectedMessages", "false", configurationCopy));
            this.forceDeleteTemporaryDestinations = Boolean.parseBoolean(Utils.getAndRemoveString("jms.forceDeleteTemporaryDestinations", "false", configurationCopy));
            this.enableTransaction = Boolean.parseBoolean(configurationCopy.getOrDefault("enableTransaction", "false").toString());
            this.emulateTransactions = Boolean.parseBoolean(Utils.getAndRemoveString("jms.emulateTransactions", "false", configurationCopy).toString());
            if (this.emulateTransactions && this.enableTransaction) {
                throw new javax.jms.IllegalStateException("You cannot set both enableTransaction and jms.emulateTransactions");
            }
            String webServiceUrl = Utils.getAndRemoveString("webServiceUrl", "http://localhost:8080", configurationCopy);
            String brokenServiceUrl = Utils.getAndRemoveString("brokerServiceUrl", "", configurationCopy);
            PulsarClient pulsarClient = null;
            PulsarAdmin pulsarAdmin = null;
            try {
                String authPluginClassName = Utils.getAndRemoveString("authPlugin", "", configurationCopy);
                String authParamsString = Utils.getAndRemoveString("authParams", "", configurationCopy);
                if (this.useCredentialsFromCreateConnection) {
                    if (connectUsername == null) {
                        connectUsername = "";
                    }
                    if (connectPassword == null) {
                        connectPassword = "";
                    }
                    if (authPluginClassName.equals(AuthenticationToken.class.getName())) {
                        authParamsString = connectPassword;
                    } else {
                        throw new IllegalStateRuntimeException("With jms.useCredentialsFromConnect:true only JWT (AuthenticationToken) authentication is currently supported");
                    }
                }
                Authentication authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
                if (log.isDebugEnabled()) {
                    log.debug("Authentication {}", (Object)authentication);
                }
                boolean tlsAllowInsecureConnection = Boolean.parseBoolean(Utils.getAndRemoveString("tlsAllowInsecureConnection", "false", configurationCopy));
                boolean tlsEnableHostnameVerification = Boolean.parseBoolean(Utils.getAndRemoveString("tlsEnableHostnameVerification", "true", configurationCopy));
                String tlsTrustCertsFilePath = Utils.getAndRemoveString("tlsTrustCertsFilePath", "", configurationCopy);
                boolean useKeyStoreTls = Boolean.parseBoolean(Utils.getAndRemoveString("useKeyStoreTls", "false", configurationCopy));
                String tlsTrustStoreType = Utils.getAndRemoveString("tlsTrustStoreType", "JKS", configurationCopy);
                String tlsTrustStorePath = Utils.getAndRemoveString("tlsTrustStorePath", "", configurationCopy);
                String tlsTrustStorePassword = Utils.getAndRemoveString("tlsTrustStorePassword", "", configurationCopy);
                pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl).allowTlsInsecureConnection(tlsAllowInsecureConnection).enableTlsHostnameVerification(tlsEnableHostnameVerification).tlsTrustCertsFilePath(tlsTrustCertsFilePath).useKeyStoreTls(useKeyStoreTls).tlsTrustStoreType(tlsTrustStoreType).tlsTrustStorePath(tlsTrustStorePath).tlsTrustStorePassword(tlsTrustStorePassword).authentication(authentication).build();
                ClientBuilder clientBuilder = PulsarClient.builder().loadConf(configurationCopy).tlsTrustStorePassword(tlsTrustStorePassword).tlsTrustStorePath(tlsTrustStorePath).tlsTrustCertsFilePath(tlsTrustCertsFilePath).tlsTrustStoreType(tlsTrustStoreType).useKeyStoreTls(useKeyStoreTls).enableTlsHostnameVerification(tlsEnableHostnameVerification).allowTlsInsecureConnection(tlsAllowInsecureConnection).serviceUrl(webServiceUrl).authentication(authentication);
                if (!brokenServiceUrl.isEmpty()) {
                    clientBuilder.serviceUrl(brokenServiceUrl);
                }
                if ((pulsarClient = this.buildPulsarClient(clientBuilder)) != null && this.refreshServerSideFiltersPeriod > 0 && this.useServerSideFiltering) {
                    PulsarClientImpl impl = (PulsarClientImpl)pulsarClient;
                    ScheduledExecutorService timer = (ScheduledExecutorService)impl.getScheduledExecutorProvider().getExecutor();
                    timer.scheduleWithFixedDelay(this::refreshServerSideSelectors, this.refreshServerSideFiltersPeriod, this.refreshServerSideFiltersPeriod, TimeUnit.SECONDS);
                }
            }
            catch (PulsarClientException err) {
                if (pulsarAdmin != null) {
                    pulsarAdmin.close();
                }
                if (pulsarClient != null) {
                    pulsarClient.close();
                }
                throw err;
            }
            this.pulsarClient = pulsarClient;
            this.pulsarAdmin = pulsarAdmin;
            if (this.useCredentialsFromCreateConnection && this.lastConnectUsername == null) {
                this.lastConnectUsername = connectUsername;
                this.lastConnectPassword = connectPassword;
            }
            this.initialized = true;
        }
        catch (Throwable t2) {
            throw Utils.handleException(t2);
        }
    }

    protected PulsarClient buildPulsarClient(ClientBuilder builder) throws PulsarClientException {
        return builder.build();
    }

    private void validateConnectUsernamePasswordReused(String connectUsername, String connectPassword) throws javax.jms.IllegalStateException {
        if (this.lastConnectUsername != null) {
            if (!Objects.equals(connectUsername, this.lastConnectUsername)) {
                throw new javax.jms.IllegalStateException("With jms.useCredentialsFromConnect:true once you call connect(username,password) you must always use the same credentials, bad username " + connectUsername + ", expecting " + this.lastConnectUsername);
            }
            if (!Objects.equals(connectPassword, this.lastConnectPassword)) {
                throw new javax.jms.IllegalStateException("With jms.useCredentialsFromConnect:true once you call connect(username,password) you must always use the same credentials, password does not match");
            }
        }
    }

    public synchronized boolean isEnableClientSideEmulation() {
        return this.enableClientSideEmulation;
    }

    public synchronized boolean isTransactionsStickyPartitions() {
        return this.transactionsStickyPartitions;
    }

    public synchronized boolean isUseServerSideFiltering() {
        return this.useServerSideFiltering;
    }

    public synchronized boolean isEnableJMSPriority() {
        return this.enableJMSPriority;
    }

    public synchronized boolean isPriorityUseLinearMapping() {
        return this.priorityUseLinearMapping;
    }

    synchronized String getDefaultClientId() {
        return this.defaultClientId;
    }

    public synchronized boolean isEnableTransaction() {
        return this.enableTransaction;
    }

    public synchronized boolean isEmulateTransactions() {
        return this.emulateTransactions;
    }

    public synchronized PulsarClient getPulsarClient() {
        return this.pulsarClient;
    }

    public synchronized PulsarAdmin getPulsarAdmin() throws javax.jms.IllegalStateException {
        if (!this.usePulsarAdmin) {
            throw new javax.jms.IllegalStateException("jms.usePulsarAdmin is set to false, this feature is not available");
        }
        return this.pulsarAdmin;
    }

    public synchronized String getSystemNamespace() {
        return this.systemNamespace;
    }

    @Override
    public PulsarConnection createConnection() throws JMSException {
        this.ensureInitialized(null, null);
        this.validateUserNamePassword(true, null, null);
        PulsarConnection res = new PulsarConnection(this);
        this.connections.add(res);
        return res;
    }

    @Override
    public PulsarConnection createConnection(String userName, String password) throws JMSException {
        this.ensureInitialized(userName, password);
        this.validateUserNamePassword(false, userName, password);
        PulsarConnection res = new PulsarConnection(this);
        this.connections.add(res);
        return res;
    }

    private synchronized void validateUserNamePassword(boolean anonymous, String userName, String password) throws JMSException {
        if (this.useCredentialsFromCreateConnection) {
            this.validateConnectUsernamePasswordReused(userName, password);
        }
        if (!(anonymous || this.tckUsername == null || this.tckUsername.isEmpty() || Objects.equals(this.tckUsername, userName) && Objects.equals(this.tckPassword, password))) {
            throw new JMSSecurityException("Unauthorized");
        }
    }

    @Override
    public JMSContext createContext() {
        return this.createContext(1);
    }

    @Override
    public JMSContext createContext(String userName, String password) {
        return this.createContext(userName, password, 1);
    }

    @Override
    public JMSContext createContext(String userName, String password, int sessionMode) {
        Utils.runtimeException(() -> this.ensureInitialized(userName, password));
        Utils.runtimeException(() -> this.validateUserNamePassword(false, userName, password));
        return new PulsarJMSContext(this, sessionMode, false, userName, password);
    }

    @Override
    public JMSContext createContext(int sessionMode) {
        Utils.runtimeException(() -> this.ensureInitialized(null, null));
        Utils.runtimeException(() -> this.validateUserNamePassword(true, null, null));
        return new PulsarJMSContext(this, sessionMode, true, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Iterator<Producer<byte[]>> iterator = this;
        synchronized (iterator) {
            this.closed = true;
            if (!this.initialized) {
                return;
            }
        }
        for (PulsarConnection con : new ArrayList<PulsarConnection>(this.connections)) {
            try {
                con.close();
            }
            catch (Exception ignore) {
                Utils.handleException(ignore);
            }
        }
        for (Producer<byte[]> producer : this.producers.values()) {
            try {
                producer.close();
            }
            catch (PulsarClientException ignore) {
                Utils.handleException(ignore);
            }
        }
        if (this.pulsarAdmin != null) {
            this.pulsarAdmin.close();
        }
        try {
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
            }
        }
        catch (PulsarClientException err) {
            log.info("Error closing PulsarClient", err);
        }
        if (this.sessionListenersThreadPool != null) {
            this.sessionListenersThreadPool.shutdown();
            this.sessionListenersThreadPool = null;
        }
    }

    public static PulsarDestination toPulsarDestination(Destination destination) throws JMSException {
        if (destination instanceof PulsarDestination) {
            return (PulsarDestination)destination;
        }
        if (destination instanceof Queue) {
            return new PulsarQueue(((Queue)destination).getQueueName());
        }
        if (destination instanceof Topic) {
            return new PulsarTopic(((Topic)destination).getTopicName());
        }
        throw new javax.jms.IllegalStateException("Cannot convert " + destination + " to a PulsarDestination");
    }

    public String getPulsarTopicName(Destination defaultDestination) throws JMSException {
        PulsarDestination destination = PulsarConnectionFactory.toPulsarDestination(defaultDestination);
        String topicName = destination.getInternalTopicName();
        return this.applySystemNamespace(topicName);
    }

    Producer<byte[]> getProducerForDestination(Destination defaultDestination, boolean transactions) throws JMSException {
        try {
            String fullQualifiedTopicName = this.getPulsarTopicName(defaultDestination);
            String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName;
            boolean transactionsStickyPartitions = transactions && this.isTransactionsStickyPartitions();
            boolean enableJMSPriority = this.isEnableJMSPriority();
            final boolean producerJMSPriorityUseLinearMapping = enableJMSPriority && this.isPriorityUseLinearMapping();
            return this.producers.computeIfAbsent(key, d -> {
                try {
                    return Utils.invoke(() -> {
                        Map<String, Object> producerConfiguration = this.getProducerConfiguration();
                        ProducerBuilder<byte[]> producerBuilder = this.pulsarClient.newProducer().topic(this.applySystemNamespace(fullQualifiedTopicName)).loadConf(producerConfiguration);
                        if (producerConfiguration.containsKey("batcherBuilder")) {
                            producerBuilder.batcherBuilder((BatcherBuilder)producerConfiguration.get("batcherBuilder"));
                        }
                        HashMap<String, String> properties = new HashMap<String, String>();
                        if (transactions) {
                            properties.put("jms.transactions", "enabled");
                        } else {
                            properties.put("jms.transactions", "disabled");
                        }
                        if (enableJMSPriority) {
                            properties.put("jms.priority", "enabled");
                            properties.put("jms.priorityMapping", producerJMSPriorityUseLinearMapping ? "linear" : "non-linear");
                            producerBuilder.messageRouter(new MessageRouter(){

                                @Override
                                public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                                    String priority = msg.getProperty("JMSPriority");
                                    int key = priority == null ? 4 : Integer.parseInt(msg.getProperty("JMSPriority"));
                                    return Utils.mapPriorityToPartition(key, metadata.numPartitions(), producerJMSPriorityUseLinearMapping);
                                }
                            });
                        } else if (transactions && transactionsStickyPartitions) {
                            producerBuilder.messageRouter(new MessageRouter(){

                                @Override
                                public int choosePartition(Message<?> msg, TopicMetadata metadata) {
                                    long key = Long.parseLong(msg.getProperty("JMSTX"));
                                    return MathUtils.signSafeMod(key, metadata.numPartitions());
                                }
                            });
                        }
                        producerBuilder.properties(properties);
                        return producerBuilder.create();
                    });
                }
                catch (JMSException err) {
                    throw new RuntimeException(err);
                }
            });
        }
        catch (RuntimeException err) {
            throw (JMSException)err.getCause();
        }
    }

    synchronized boolean isUsePulsarAdmin() {
        return this.usePulsarAdmin;
    }

    synchronized boolean isPrecreateQueueSubscription() {
        return this.precreateQueueSubscription;
    }

    public void ensureQueueSubscription(PulsarDestination destination) throws JMSException {
        if (!this.isPrecreateQueueSubscription()) {
            return;
        }
        if (destination.isRegExp()) {
            return;
        }
        if (destination.isMultiTopic()) {
            for (PulsarDestination subDestination : destination.getDestinations()) {
                this.ensureQueueSubscription(subDestination);
            }
            return;
        }
        long start = System.currentTimeMillis();
        String fullQualifiedTopicName = this.getPulsarTopicName(destination);
        while (true) {
            String subscriptionName = this.getQueueSubscriptionName(destination);
            try {
                if (this.isUsePulsarAdmin()) {
                    this.getPulsarAdmin().topics().createSubscription(fullQualifiedTopicName, subscriptionName, MessageId.earliest);
                    break;
                }
                this.getPulsarClient().newConsumer().subscriptionType(this.getTopicSharedSubscriptionType()).subscriptionName(subscriptionName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(this.getPrecreateQueueSubscriptionConsumerQueueSize(destination.isRegExp())).topic(fullQualifiedTopicName).subscribe().close();
                break;
            }
            catch (PulsarAdminException.ConflictException exists) {
                log.debug("Subscription {} already exists for {}", (Object)subscriptionName, (Object)fullQualifiedTopicName);
                break;
            }
            catch (PulsarAdminException | PulsarClientException err) {
                long now = System.currentTimeMillis();
                if (now - start > this.getWaitForServerStartupTimeout()) {
                    throw Utils.handleException(err);
                }
                log.info("Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting", (Object)err.toString(), (Object)fullQualifiedTopicName);
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    Thread.currentThread().interrupt();
                    throw Utils.handleException(err);
                }
            }
        }
    }

    public ConsumerBase<?> createConsumer(PulsarDestination destination, String consumerName, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String messageSelector, boolean noLocal, PulsarSession session) throws JMSException {
        if (destination.isQueue() && subscriptionMode != SubscriptionMode.Durable) {
            throw new javax.jms.IllegalStateException("only durable mode for queues");
        }
        if (destination.isQueue() && subscriptionType == SubscriptionType.Exclusive) {
            throw new javax.jms.IllegalStateException("only Shared SubscriptionType for queues");
        }
        log.debug("createConsumer {} {} {} {}", new Object[]{destination, consumerName, subscriptionMode, subscriptionType, messageSelector});
        HashMap<String, String> subscriptionProperties = new HashMap<String, String>();
        HashMap<String, String> consumerMetadata = new HashMap<String, String>();
        consumerMetadata.put("jms.destination.type", destination.isQueue() ? "queue" : "topic");
        consumerMetadata.put("jms.acknowledgeMode", PulsarSession.ACKNOWLEDGE_MODE_TO_STRING(session.getAcknowledgeMode()));
        if (this.isUseServerSideFiltering()) {
            consumerMetadata.put("jms.filtering", "true");
            subscriptionProperties.put("jms.destination.type", destination.isQueue() ? "queue" : "topic");
            if (noLocal) {
                consumerMetadata.put("jms.filter.JMSConnectionID", session.getConnection().getConnectionId());
            }
        }
        if (this.isUseServerSideFiltering()) {
            if (messageSelector != null) {
                consumerMetadata.put("jms.selector", messageSelector);
            }
            if (destination.isTopic()) {
                consumerMetadata.put("jms.selector.reject.action", "drop");
            } else {
                consumerMetadata.put("jms.selector.reject.action", "reschedule");
            }
        }
        if (this.isAcknowledgeRejectedMessages()) {
            consumerMetadata.put("jms.force.drop.rejected", "true");
        }
        boolean enablePriority = false;
        if (this.isEnableJMSPriority()) {
            enablePriority = true;
            consumerMetadata.put("jms.priority", "enabled");
        }
        try {
            String fullQualifiedTopicName;
            ConsumerConfiguration consumerConfiguration = this.getConsumerConfiguration(session.getOverrideConsumerConfiguration(), destination);
            Schema<Object> schema = consumerConfiguration.getConsumerSchema();
            if (schema == null) {
                schema = Schema.BYTES;
            }
            String subscriptionName = destination.isQueue() ? this.getQueueSubscriptionName(destination) : consumerName;
            SubscriptionInitialPosition initialPosition = destination.isTopic() ? SubscriptionInitialPosition.Latest : SubscriptionInitialPosition.Earliest;
            ConsumerBuilder<?> builder = this.pulsarClient.newConsumer(schema).negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).loadConf(consumerConfiguration.getConsumerConfiguration()).properties(consumerMetadata).subscriptionInitialPosition(initialPosition).subscriptionMode(subscriptionMode).subscriptionProperties(subscriptionProperties).subscriptionType(subscriptionType).subscriptionName(subscriptionName);
            if (enablePriority) {
                builder.startPaused(true);
            }
            if (destination.isRegExp()) {
                fullQualifiedTopicName = this.getPulsarTopicName(destination);
                builder.topicsPattern(fullQualifiedTopicName);
            } else if (destination.isMultiTopic()) {
                List<PulsarDestination> destinations = destination.getDestinations();
                ArrayList<String> fullQualifiedTopicNames = new ArrayList<String>(destinations.size());
                for (PulsarDestination d : destinations) {
                    fullQualifiedTopicNames.add(this.getPulsarTopicName(d));
                }
                builder.topics(fullQualifiedTopicNames);
            } else {
                fullQualifiedTopicName = this.getPulsarTopicName(destination);
                builder.topic(fullQualifiedTopicName);
            }
            if (consumerConfiguration.getDeadLetterPolicy() != null) {
                builder.deadLetterPolicy(consumerConfiguration.getDeadLetterPolicy());
            }
            if (consumerConfiguration.getNegativeAckRedeliveryBackoff() != null) {
                builder.negativeAckRedeliveryBackoff(consumerConfiguration.getNegativeAckRedeliveryBackoff());
            }
            if (consumerConfiguration.getAckTimeoutRedeliveryBackoff() != null) {
                builder.ackTimeoutRedeliveryBackoff(consumerConfiguration.getAckTimeoutRedeliveryBackoff());
            }
            builder.intercept(session.getConsumerInterceptor());
            Consumer<?> newConsumer = builder.subscribe();
            if (log.isDebugEnabled() && newConsumer instanceof MultiTopicsConsumerImpl) {
                MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl)newConsumer;
                log.debug("Destinations {}", (Object)multiTopicsConsumer.getPartitions());
            }
            this.consumers.add(newConsumer);
            if (this.isEnableJMSPriority()) {
                PulsarConnectionFactory.replaceIncomingMessageList(newConsumer);
                newConsumer.resume();
            }
            return (ConsumerBase)newConsumer;
        }
        catch (PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    private static void replaceIncomingMessageList(Consumer c) {
        try {
            ConsumerBase consumerBase = (ConsumerBase)c;
            Field incomingMessages = ConsumerBase.class.getDeclaredField("incomingMessages");
            incomingMessages.setAccessible(true);
            Object oldQueue = incomingMessages.get(consumerBase);
            if (oldQueue.getClass().isAssignableFrom(PriorityBlockingQueue.class)) {
                PriorityBlockingQueue<Message> newQueue = new PriorityBlockingQueue<Message>(10, new Comparator<Message>(){

                    @Override
                    public int compare(Message o1, Message o2) {
                        int priority1 = PulsarConnectionFactory.getPriority(o1);
                        int priority2 = PulsarConnectionFactory.getPriority(o2);
                        return Integer.compare(priority2, priority1);
                    }
                });
                ((BlockingQueue)oldQueue).drainTo(newQueue);
                incomingMessages.set(c, newQueue);
            } else {
                log.debug("Field incomingMessages is not a PriorityBlockingQueue, it is a {}.We cannot apply priority to the messages in the local buffer.", (Object)oldQueue.getClass().getName());
            }
        }
        catch (Exception err) {
            throw new RuntimeException(err);
        }
    }

    private static int getPriority(Message m3) {
        String jmsPriority = m3.getProperty("JMSPriority");
        if (jmsPriority == null || jmsPriority.isEmpty()) {
            return 4;
        }
        try {
            return Integer.parseInt(jmsPriority);
        }
        catch (NumberFormatException err) {
            return 4;
        }
    }

    /*
     * Loose catch block
     */
    public String downloadServerSideFilter(String fullQualifiedTopicName, String subscriptionName, SubscriptionMode subscriptionMode) throws JMSException {
        if (!this.isUseServerSideFiltering() || subscriptionMode != SubscriptionMode.Durable) {
            return null;
        }
        log.info("downloadServerSideFilter {} {} {}", new Object[]{fullQualifiedTopicName, subscriptionName, subscriptionMode});
        long start = System.currentTimeMillis();
        while (true) {
            try {
                Map<String, String> subscriptionPropertiesFromBroker = this.pulsarAdmin.topics().getSubscriptionProperties(fullQualifiedTopicName, subscriptionName);
                if (subscriptionPropertiesFromBroker != null) {
                    String selectorOnSubscription;
                    log.debug("subscriptionPropertiesFromBroker {}", (Object)subscriptionPropertiesFromBroker);
                    boolean filtering = "true".equals(subscriptionPropertiesFromBroker.get("jms.filtering"));
                    if (filtering && !(selectorOnSubscription = subscriptionPropertiesFromBroker.getOrDefault("jms.selector", "")).isEmpty()) {
                        log.info("Detected selector {} on Subscription {} on topic {}", selectorOnSubscription, subscriptionName, fullQualifiedTopicName);
                        return selectorOnSubscription;
                    }
                }
                return null;
            }
            catch (PulsarAdminException.PreconditionFailedException notReady) {
                long now = System.currentTimeMillis();
                if (now - start > this.getWaitForServerStartupTimeout()) {
                    throw Utils.handleException(notReady);
                }
                log.info("Temporary error, cannot download server-side filters {}: {}", (Object)fullQualifiedTopicName, (Object)(notReady + ""));
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {
                    Thread.currentThread().interrupt();
                    throw Utils.handleException(notReady);
                }
            }
        }
        catch (PulsarAdminException err) {
            throw Utils.handleException(err);
        }
    }

    public List<Reader<?>> createReadersForBrowser(PulsarDestination destination, ConsumerConfiguration overrideConsumerConfiguration) throws JMSException {
        if (destination.isRegExp()) {
            try {
                String topicName = this.getPulsarTopicName(destination);
                List<String> topicNames = TopicDiscoveryUtils.discoverTopicsByPattern(topicName, this.getPulsarClient(), 1000);
                log.info("createReadersForBrowser {} - {} - {}", destination, topicName, topicNames);
                ArrayList res = new ArrayList();
                for (String sub : topicNames) {
                    String queueName = sub + ":" + this.getQueueSubscriptionName(destination);
                    PulsarQueue queue = new PulsarQueue(queueName);
                    res.addAll(this.createReadersForBrowser(queue, overrideConsumerConfiguration));
                }
                return res;
            }
            catch (Exception err) {
                throw Utils.handleException(err);
            }
        }
        if (destination.isMultiTopic()) {
            ArrayList res = new ArrayList();
            List<PulsarDestination> destinations = destination.getDestinations();
            for (PulsarDestination sub : destinations) {
                res.addAll(this.createReadersForBrowser((PulsarQueue)sub, overrideConsumerConfiguration));
            }
            return res;
        }
        String fullQualifiedTopicName = this.getPulsarTopicName(destination);
        String queueSubscriptionName = this.getQueueSubscriptionName(destination);
        try {
            PartitionedTopicMetadata partitionedTopicMetadata = this.getPulsarAdmin().topics().getPartitionedTopicMetadata(fullQualifiedTopicName);
            ArrayList readers = new ArrayList();
            if (partitionedTopicMetadata.partitions == 0) {
                Reader<?> readerForBrowserForNonPartitionedTopic = this.createReaderForBrowserForNonPartitionedTopic(queueSubscriptionName, fullQualifiedTopicName, overrideConsumerConfiguration, destination);
                readers.add(readerForBrowserForNonPartitionedTopic);
            } else {
                for (int i = 0; i < partitionedTopicMetadata.partitions; ++i) {
                    String partitionName = fullQualifiedTopicName + "-partition-" + i;
                    Reader<?> readerForBrowserForNonPartitionedTopic = this.createReaderForBrowserForNonPartitionedTopic(queueSubscriptionName, partitionName, overrideConsumerConfiguration, destination);
                    readers.add(readerForBrowserForNonPartitionedTopic);
                }
            }
            return readers;
        }
        catch (PulsarAdminException.NotFoundException err) {
            return Collections.emptyList();
        }
        catch (PulsarAdminException err) {
            throw Utils.handleException(err);
        }
    }

    private Reader<?> createReaderForBrowserForNonPartitionedTopic(String queueSubscriptionName, String fullQualifiedTopicName, ConsumerConfiguration overrideConsumerConfiguration, PulsarDestination destination) throws JMSException {
        try {
            List<Message<byte[]>> messages = this.getPulsarAdmin().topics().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1);
            MessageId seekMessageId = messages.isEmpty() ? MessageId.latest : messages.get(0).getMessageId();
            if (log.isDebugEnabled()) {
                log.debug("createBrowser {} at {}", (Object)fullQualifiedTopicName, (Object)seekMessageId);
            }
            log.info("createBrowser {} at {}", (Object)fullQualifiedTopicName, (Object)seekMessageId);
            ConsumerConfiguration consumerConfiguration = this.getConsumerConfiguration(overrideConsumerConfiguration, destination);
            Schema<Object> schema = consumerConfiguration.getConsumerSchema();
            if (schema == null) {
                schema = Schema.BYTES;
            }
            Map<String, Object> readerConfiguration = Utils.deepCopyMap(consumerConfiguration.getConsumerConfiguration());
            readerConfiguration.remove("batchIndexAckEnabled");
            ReaderBuilder<?> builder = this.pulsarClient.newReader(schema).loadConf(readerConfiguration).readerName("jms-queue-browser-" + UUID.randomUUID()).startMessageId(seekMessageId).startMessageIdInclusive().topic(fullQualifiedTopicName);
            Reader<?> newReader = builder.create();
            this.readers.add(newReader);
            return newReader;
        }
        catch (PulsarAdminException | PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    public void removeConsumer(Consumer<?> consumer) {
        this.consumers.remove(consumer);
    }

    public void removeReader(Reader<?> reader) {
        this.readers.remove(reader);
    }

    @SuppressFBWarnings(value={"REC_CATCH_EXCEPTION"})
    public boolean deleteSubscription(PulsarDestination destination, String name) throws JMSException {
        boolean somethingDone;
        block11: {
            String systemNamespace = this.getSystemNamespace();
            somethingDone = false;
            try {
                if (destination != null) {
                    if (destination.isVirtualDestination()) {
                        throw new InvalidDestinationException("Virtual destinations are not supported for unsubscribe");
                    }
                    String fullQualifiedTopicName = this.getPulsarTopicName(destination);
                    log.info("deleteSubscription topic {} name {}", (Object)fullQualifiedTopicName, (Object)name);
                    try {
                        this.pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true);
                        somethingDone = true;
                    }
                    catch (PulsarAdminException.NotFoundException notFound) {
                        log.error("Cannot unsubscribe {} from {}: not found", (Object)name, (Object)fullQualifiedTopicName);
                    }
                }
                if (somethingDone) break block11;
                List<String> allTopics = this.pulsarAdmin.topics().getList(systemNamespace);
                for (String topic : allTopics) {
                    List<String> subscriptions;
                    if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) {
                        log.info("Ignoring system topic {}", (Object)topic);
                        continue;
                    }
                    log.info("Scanning topic {}", (Object)topic);
                    try {
                        subscriptions = this.pulsarAdmin.topics().getSubscriptions(topic);
                        log.info("Subscriptions {}", (Object)subscriptions);
                    }
                    catch (PulsarAdminException.NotFoundException notFound) {
                        log.error("Skipping topic {}", (Object)topic);
                        subscriptions = Collections.emptyList();
                    }
                    for (String subscription : subscriptions) {
                        log.info("Found subscription {} ", (Object)subscription);
                        if (!subscription.equals(name)) continue;
                        log.info("deleteSubscription topic {} name {}", (Object)topic, (Object)name);
                        this.pulsarAdmin.topics().deleteSubscription(topic, name, true);
                        somethingDone = true;
                    }
                }
            }
            catch (Exception err) {
                throw Utils.handleException(err);
            }
        }
        return somethingDone;
    }

    public void registerClientId(String clientID) throws InvalidClientIDException {
        log.debug("registerClientId {}, existing {}", (Object)clientID, (Object)clientIdentifiers);
        if (!clientIdentifiers.add(clientID)) {
            throw new InvalidClientIDException("A connection with this client id '" + clientID + "'is already opened locally");
        }
    }

    public void unregisterConnection(PulsarConnection connection) {
        if (connection.clientId != null) {
            clientIdentifiers.remove(connection.clientId);
            log.debug("unregisterClientId {} {}", (Object)connection.clientId, (Object)clientIdentifiers);
        }
        this.connections.remove(connection);
    }

    @Override
    public QueueConnection createQueueConnection() throws JMSException {
        return this.createConnection();
    }

    @Override
    public QueueConnection createQueueConnection(String s2, String s1) throws JMSException {
        return this.createConnection(s2, s1);
    }

    @Override
    public TopicConnection createTopicConnection() throws JMSException {
        return this.createConnection();
    }

    @Override
    public TopicConnection createTopicConnection(String s2, String s1) throws JMSException {
        return this.createConnection(s2, s1);
    }

    public synchronized boolean isForceDeleteTemporaryDestinations() {
        return this.forceDeleteTemporaryDestinations;
    }

    public synchronized String getQueueSubscriptionName(PulsarDestination destination) throws InvalidDestinationException {
        String customSubscriptionName = destination.extractSubscriptionName();
        if (customSubscriptionName != null) {
            return customSubscriptionName;
        }
        return this.queueSubscriptionName;
    }

    public synchronized long getWaitForServerStartupTimeout() {
        return this.waitForServerStartupTimeout;
    }

    public synchronized SubscriptionType getExclusiveSubscriptionTypeForSimpleConsumers(Destination destination) {
        return this.useExclusiveSubscriptionsForSimpleConsumers ? SubscriptionType.Exclusive : (destination instanceof Queue ? SubscriptionType.Shared : this.getTopicSharedSubscriptionType());
    }

    public synchronized SubscriptionType getTopicSharedSubscriptionType() {
        return this.topicSharedSubscriptionType;
    }

    public String applySystemNamespace(String destination) {
        if (destination == null) {
            return null;
        }
        if (destination.startsWith("persistent://") || destination.startsWith("non-persistent://")) {
            return destination;
        }
        return "persistent://" + this.getSystemNamespace() + "/" + destination;
    }

    public boolean isAcknowledgeRejectedMessages() {
        return this.acknowledgeRejectedMessages;
    }

    public synchronized boolean isClosed() {
        return this.closed;
    }

    private synchronized int getPrecreateQueueSubscriptionConsumerQueueSize(boolean regExp) {
        if (regExp) {
            return Math.max(this.precreateQueueSubscriptionConsumerQueueSize, 1);
        }
        return this.precreateQueueSubscriptionConsumerQueueSize;
    }

    private synchronized void writeObject(ObjectOutputStream out) throws IOException {
        String serialisedConfiguration = new ObjectMapper().writeValueAsString(this.configuration);
        if (log.isDebugEnabled()) {
            log.debug("Serializing this PulsarConnectionFactory as {}", (Object)serialisedConfiguration);
        }
        out.writeUTF(serialisedConfiguration);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        this.resetDefaultValues();
        String readConfiguration = in.readUTF();
        if (log.isDebugEnabled()) {
            log.debug("Deserialize configuration as {}", (Object)this.configuration);
        }
        try {
            this.setJsonConfiguration(readConfiguration);
        }
        catch (Exception err) {
            throw new IOException("Cannot decode JSON configuration " + this.configuration);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setFinalField(String name, Object value) {
        try {
            Field modifiersField;
            Field field = this.getClass().getDeclaredField(name);
            boolean accessible = field.isAccessible();
            if (!accessible) {
                field.setAccessible(true);
                modifiersField = PulsarConnectionFactory.getModifiersField();
                modifiersField.setInt(field, field.getModifiers() & 0xFFFFFFEF);
            }
            try {
                field.set(this, value);
            }
            finally {
                if (!accessible) {
                    field.setAccessible(false);
                    modifiersField = PulsarConnectionFactory.getModifiersField();
                    modifiersField.setInt(field, field.getModifiers() | 0x10);
                }
            }
        }
        catch (Exception err) {
            log.error("Error while setting final field {}", (Object)name, (Object)err);
            throw new RuntimeException(err);
        }
    }

    private static Field getModifiersField() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        Method getDeclaredFields0 = Class.class.getDeclaredMethod("getDeclaredFields0", Boolean.TYPE);
        getDeclaredFields0.setAccessible(true);
        Field[] fields = (Field[])getDeclaredFields0.invoke(Field.class, false);
        Field modifiersField = null;
        for (Field each : fields) {
            if (!"modifiers".equals(each.getName())) continue;
            modifiersField = each;
            break;
        }
        if (modifiersField == null) {
            throw new RuntimeException("Cannot find modifiers field");
        }
        modifiersField.setAccessible(true);
        return modifiersField;
    }

    private synchronized void resetDefaultValues() {
        if (this.initialized) {
            throw new IllegalStateException();
        }
        this.setFinalField("producers", new ConcurrentHashMap());
        this.setFinalField("connections", Collections.synchronizedSet(new HashSet()));
        this.setFinalField("consumers", new CopyOnWriteArrayList());
        this.setFinalField("readers", new CopyOnWriteArrayList());
        this.initialized = false;
        this.closed = false;
    }

    private void refreshServerSideSelectors() {
        this.connections.forEach(c -> c.refreshServerSideSelectors());
    }

    public JMSAdmin getAdmin() {
        return new PulsarJMSAdminImpl(this);
    }

    PulsarClient ensureClient() throws JMSException {
        this.createConnection().close();
        if (this.pulsarClient == null) {
            throw new javax.jms.IllegalStateException("This PulsarConnectionFactory is not configured to bootstrap a PulsarClient");
        }
        return this.pulsarClient;
    }

    PulsarAdmin ensurePulsarAdmin() throws JMSException {
        this.createConnection().close();
        if (this.pulsarAdmin == null) {
            throw new javax.jms.IllegalStateException("This PulsarConnectionFactory is not configured to bootstrap a PulsarAdmin");
        }
        return this.pulsarAdmin;
    }

    public synchronized int getSessionListenersThreads() {
        return this.sessionListenersThreads;
    }

    public synchronized ScheduledExecutorService getSessionListenersThreadPool() {
        if (this.sessionListenersThreads > 0 && this.sessionListenersThreadPool == null) {
            log.info("{} Starting MessageListeners thread pool, size is jms.sessionListenersThreads={}", (Object)this, (Object)this.sessionListenersThreads);
            this.sessionListenersThreadPool = new ScheduledThreadPoolExecutor(this.sessionListenersThreads, new SessionListenersThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        }
        return this.sessionListenersThreadPool;
    }

    public synchronized boolean isMaxMessagesLimitsParallelism() {
        return this.maxMessagesLimitsParallelism;
    }

    public synchronized int getConnectionConsumerStopTimeout() {
        return this.connectionConsumerStopTimeout;
    }

    private static class SessionListenersThreadFactory
    implements ThreadFactory {
        private static final AtomicInteger sessionThreadNumber = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            String name = "jms-session-thread-" + sessionThreadNumber.getAndIncrement();
            Thread thread = new Thread(r, name);
            thread.setDaemon(true);
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    log.error("Internal error in JMS Session thread {}", (Object)t, (Object)e);
                }
            });
            return thread;
        }
    }
}

