package com.datastax.oss.pulsar.jms;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarConnectionFactory.class */
public class PulsarConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, AutoCloseable {
    private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
    private final Map<String, Producer<byte[]>> producers;
    private final Set<PulsarConnection> connections;
    private final List<Consumer<?>> consumers;
    private final List<Reader<?>> readers;
    private PulsarClient pulsarClient;
    private PulsarAdmin pulsarAdmin;
    private Map<String, Object> producerConfiguration;
    private Map<String, Object> consumerConfiguration;
    private Schema<?> consumerSchema;
    private String systemNamespace;
    private String defaultClientId;
    private boolean enableTransaction;
    private boolean emulateTransactions;
    private boolean enableClientSideEmulation;
    private boolean useServerSideFiltering;
    private boolean forceDeleteTemporaryDestinations;
    private boolean useExclusiveSubscriptionsForSimpleConsumers;
    private boolean acknowledgeRejectedMessages;
    private String tckUsername;
    private String tckPassword;
    private String queueSubscriptionName;
    private SubscriptionType topicSharedSubscriptionType;
    private long waitForServerStartupTimeout;
    private boolean usePulsarAdmin;
    private int precreateQueueSubscriptionConsumerQueueSize;
    private boolean initialized;
    private boolean closed;
    private Map<String, Object> configuration;
    private static final Logger log = LoggerFactory.getLogger(PulsarConnectionFactory.class);
    private static final String SHADED_PREFIX = "com.datastax.oss.pulsar.jms.shaded.";
    private static final boolean NEEDS_RELOCATION = PulsarClient.class.getName().startsWith(SHADED_PREFIX);
    private static final Set<String> clientIdentifiers = new ConcurrentSkipListSet();

    public PulsarConnectionFactory() throws JMSException {
        this(new HashMap());
    }

    public PulsarConnectionFactory(Map<String, Object> map) {
        this.producers = new ConcurrentHashMap();
        this.connections = Collections.synchronizedSet(new HashSet());
        this.consumers = new CopyOnWriteArrayList();
        this.readers = new CopyOnWriteArrayList();
        this.systemNamespace = "public/default";
        this.defaultClientId = null;
        this.enableTransaction = false;
        this.emulateTransactions = false;
        this.enableClientSideEmulation = false;
        this.useServerSideFiltering = false;
        this.forceDeleteTemporaryDestinations = false;
        this.useExclusiveSubscriptionsForSimpleConsumers = false;
        this.acknowledgeRejectedMessages = false;
        this.tckUsername = "";
        this.tckPassword = "";
        this.queueSubscriptionName = "jms-queue";
        this.topicSharedSubscriptionType = SubscriptionType.Shared;
        this.waitForServerStartupTimeout = 60000L;
        this.usePulsarAdmin = true;
        this.precreateQueueSubscriptionConsumerQueueSize = 0;
        setConfiguration(map);
    }

    public PulsarConnectionFactory(String str) throws JMSException {
        this();
        setJsonConfiguration(str);
    }

    public String getJsonConfiguration() {
        return (String) Utils.runtimeException(() -> {
            return new ObjectMapper().writeValueAsString(getConfiguration());
        });
    }

    public void setJsonConfiguration(String str) {
        if (str == null || str.isEmpty()) {
            setConfiguration(Collections.emptyMap());
        } else {
            setConfiguration((Map) Utils.runtimeException(() -> {
                return (Map) new ObjectMapper().readValue(str, Map.class);
            }));
        }
    }

    public synchronized Map<String, Object> getConfiguration() {
        return new HashMap(this.configuration);
    }

    public synchronized void setConfiguration(Map<String, Object> map) {
        this.configuration = copyAndApplyShadedPrefix(map);
    }

    private static Map<String, Object> copyAndApplyShadedPrefix(Map<String, Object> map) {
        if (map == null) {
            return null;
        }
        if (!NEEDS_RELOCATION) {
            return new HashMap(map);
        }
        HashMap hashMap = new HashMap();
        map.forEach((str, obj) -> {
            if (obj instanceof Map) {
                hashMap.put(str, copyAndApplyShadedPrefix((Map) obj));
                return;
            }
            if (!(obj instanceof String)) {
                hashMap.put(str, obj);
                return;
            }
            String str = (String) obj;
            if (str.length() > 17 && str.substring(1).startsWith("rg.apache.pulsar")) {
                str = SHADED_PREFIX + obj;
            }
            if (log.isDebugEnabled()) {
                log.debug("Relocating {} = {} -> {}", new Object[]{str, obj, str});
            }
            hashMap.put(str, str);
        });
        return hashMap;
    }

    private synchronized Map<String, Object> getConsumerConfiguration() {
        return this.consumerConfiguration;
    }

    private synchronized Schema<?> getConsumerSchema() {
        return this.consumerSchema;
    }

    private synchronized Map<String, Object> getProducerConfiguration() {
        return this.producerConfiguration;
    }

    private synchronized void ensureInitialized() throws JMSException {
        BatcherBuilder batcherBuilder;
        if (this.initialized) {
            return;
        }
        if (this.closed) {
            throw new IllegalStateException("This ConnectionFactory is closed");
        }
        try {
            Map map = (Map) this.configuration.remove("producerConfig");
            if (map != null) {
                Object obj = map.get("batcherBuilder");
                if (obj != null && (obj instanceof String)) {
                    String str = (String) obj;
                    BatcherBuilder batcherBuilder2 = BatcherBuilder.DEFAULT;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case -2032180703:
                            if (str.equals("DEFAULT")) {
                                z = true;
                                break;
                            }
                            break;
                        case 2040886675:
                            if (str.equals("KEY_BASED")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            batcherBuilder = BatcherBuilder.KEY_BASED;
                            break;
                        case true:
                            batcherBuilder = BatcherBuilder.DEFAULT;
                            break;
                        default:
                            throw new IllegalArgumentException("Unsupported batcherBuilder " + str);
                    }
                    map.put("batcherBuilder", batcherBuilder);
                }
                this.producerConfiguration = new HashMap(map);
            } else {
                this.producerConfiguration = Collections.emptyMap();
            }
            this.consumerSchema = Schema.BYTES;
            Map map2 = (Map) this.configuration.remove("consumerConfig");
            if (map2 != null) {
                this.consumerConfiguration = new HashMap(map2);
                if (Boolean.parseBoolean(getAndRemoveString("useSchema", "false", this.consumerConfiguration))) {
                    this.consumerSchema = Schema.AUTO_CONSUME();
                }
            } else {
                this.consumerConfiguration = Collections.emptyMap();
            }
            this.systemNamespace = getAndRemoveString("jms.systemNamespace", "public/default", this.configuration);
            this.tckUsername = getAndRemoveString("jms.tckUsername", "", this.configuration);
            this.tckPassword = getAndRemoveString("jms.tckPassword", "", this.configuration);
            this.defaultClientId = getAndRemoveString("jms.clientId", null, this.configuration);
            this.queueSubscriptionName = getAndRemoveString("jms.queueSubscriptionName", "jms-queue", this.configuration);
            this.usePulsarAdmin = Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", this.configuration));
            this.precreateQueueSubscriptionConsumerQueueSize = Integer.parseInt(getAndRemoveString("jms.precreateQueueSubscriptionConsumerQueueSize", "0", this.configuration));
            String andRemoveString = getAndRemoveString("jms.topicSharedSubscriptionType", SubscriptionType.Shared.name(), this.configuration);
            this.topicSharedSubscriptionType = (SubscriptionType) Stream.of((Object[]) SubscriptionType.values()).filter(subscriptionType -> {
                return subscriptionType.name().equalsIgnoreCase(andRemoveString) && subscriptionType != SubscriptionType.Exclusive;
            }).findFirst().orElseThrow(() -> {
                return new IllegalArgumentException("Invalid jms.topicSubscriptionType: " + andRemoveString + ", only " + SubscriptionType.Shared + ", " + SubscriptionType.Key_Shared + " and " + SubscriptionType.Failover + " ");
            });
            this.waitForServerStartupTimeout = Long.parseLong(getAndRemoveString("jms.waitForServerStartupTimeout", "60000", this.configuration));
            this.enableClientSideEmulation = Boolean.parseBoolean(getAndRemoveString("jms.enableClientSideEmulation", "false", this.configuration));
            this.useServerSideFiltering = Boolean.parseBoolean(getAndRemoveString("jms.useServerSideFiltering", "false", this.configuration));
            this.useExclusiveSubscriptionsForSimpleConsumers = Boolean.parseBoolean(getAndRemoveString("jms.useExclusiveSubscriptionsForSimpleConsumers", "true", this.configuration));
            this.acknowledgeRejectedMessages = Boolean.parseBoolean(getAndRemoveString("jms.acknowledgeRejectedMessages", "false", this.configuration));
            this.forceDeleteTemporaryDestinations = Boolean.parseBoolean(getAndRemoveString("jms.forceDeleteTemporaryDestinations", "false", this.configuration));
            this.enableTransaction = Boolean.parseBoolean(this.configuration.getOrDefault("enableTransaction", "false").toString());
            this.emulateTransactions = Boolean.parseBoolean(getAndRemoveString("jms.emulateTransactions", "false", this.configuration).toString());
            if (this.emulateTransactions && this.enableTransaction) {
                throw new IllegalStateException("You cannot set both enableTransaction and jms.emulateTransactions");
            }
            String andRemoveString2 = getAndRemoveString("webServiceUrl", "http://localhost:8080", this.configuration);
            String andRemoveString3 = getAndRemoveString("brokerServiceUrl", "", this.configuration);
            PulsarClient pulsarClient = null;
            PulsarAdmin pulsarAdmin = null;
            try {
                Authentication create = AuthenticationFactory.create(getAndRemoveString("authPlugin", "", this.configuration), getAndRemoveString("authParams", "", this.configuration));
                if (log.isDebugEnabled()) {
                    log.debug("Authentication {}", create);
                }
                boolean parseBoolean = Boolean.parseBoolean(getAndRemoveString("tlsAllowInsecureConnection", "false", this.configuration));
                boolean parseBoolean2 = Boolean.parseBoolean(getAndRemoveString("tlsEnableHostnameVerification", "false", this.configuration));
                String andRemoveString4 = getAndRemoveString("tlsTrustCertsFilePath", "", this.configuration);
                boolean parseBoolean3 = Boolean.parseBoolean(getAndRemoveString("useKeyStoreTls", "false", this.configuration));
                String andRemoveString5 = getAndRemoveString("tlsTrustStoreType", "JKS", this.configuration);
                String andRemoveString6 = getAndRemoveString("tlsTrustStorePath", "", this.configuration);
                String andRemoveString7 = getAndRemoveString("tlsTrustStorePassword", "", this.configuration);
                pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(andRemoveString2).allowTlsInsecureConnection(parseBoolean).enableTlsHostnameVerification(parseBoolean2).tlsTrustCertsFilePath(andRemoveString4).useKeyStoreTls(parseBoolean3).tlsTrustStoreType(andRemoveString5).tlsTrustStorePath(andRemoveString6).tlsTrustStorePassword(andRemoveString7).authentication(create).build();
                ClientBuilder authentication = PulsarClient.builder().loadConf(this.configuration).tlsTrustStorePassword(andRemoveString7).tlsTrustStorePath(andRemoveString6).tlsTrustCertsFilePath(andRemoveString4).tlsTrustStoreType(andRemoveString5).useKeyStoreTls(parseBoolean3).enableTlsHostnameVerification(parseBoolean2).allowTlsInsecureConnection(parseBoolean).serviceUrl(andRemoveString2).authentication(create);
                if (!andRemoveString3.isEmpty()) {
                    authentication.serviceUrl(andRemoveString3);
                }
                pulsarClient = authentication.build();
                this.pulsarClient = pulsarClient;
                this.pulsarAdmin = pulsarAdmin;
                this.initialized = true;
            } catch (PulsarClientException e) {
                if (pulsarAdmin != null) {
                    pulsarAdmin.close();
                }
                if (pulsarClient != null) {
                    pulsarClient.close();
                }
                throw e;
            }
        } catch (Throwable th) {
            throw Utils.handleException(th);
        }
    }

    private static String getAndRemoveString(String str, String str2, Map<String, Object> map) {
        Object remove = map.remove(str);
        return remove != null ? remove.toString() : str2;
    }

    public synchronized boolean isEnableClientSideEmulation() {
        return this.enableClientSideEmulation;
    }

    public synchronized boolean isUseServerSideFiltering() {
        return this.useServerSideFiltering;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized String getDefaultClientId() {
        return this.defaultClientId;
    }

    public synchronized boolean isEnableTransaction() {
        return this.enableTransaction;
    }

    public synchronized boolean isEmulateTransactions() {
        return this.emulateTransactions;
    }

    public synchronized PulsarClient getPulsarClient() {
        return this.pulsarClient;
    }

    public synchronized PulsarAdmin getPulsarAdmin() throws IllegalStateException {
        if (this.usePulsarAdmin) {
            return this.pulsarAdmin;
        }
        throw new IllegalStateException("jms.usePulsarAdmin is set to false, this feature is not available");
    }

    public synchronized String getSystemNamespace() {
        return this.systemNamespace;
    }

    /* renamed from: createConnection, reason: merged with bridge method [inline-methods] */
    public PulsarConnection m6createConnection() throws JMSException {
        ensureInitialized();
        PulsarConnection pulsarConnection = new PulsarConnection(this);
        this.connections.add(pulsarConnection);
        return pulsarConnection;
    }

    /* renamed from: createConnection, reason: merged with bridge method [inline-methods] */
    public PulsarConnection m5createConnection(String str, String str2) throws JMSException {
        ensureInitialized();
        validateDummyUserNamePassword(str, str2);
        return m6createConnection();
    }

    private synchronized void validateDummyUserNamePassword(String str, String str2) throws JMSSecurityException {
        if (this.tckUsername != null && !this.tckUsername.isEmpty() && !this.tckUsername.equals(str) && this.tckPassword != null && !this.tckPassword.equals(str2)) {
            throw new JMSSecurityException("Unauthorized");
        }
    }

    public JMSContext createContext() {
        return createContext(1);
    }

    public JMSContext createContext(String str, String str2) {
        return createContext(str, str2, 1);
    }

    public JMSContext createContext(String str, String str2, int i) {
        Utils.runtimeException(this::ensureInitialized);
        Utils.runtimeException(() -> {
            validateDummyUserNamePassword(str, str2);
        });
        return createContext(i);
    }

    public JMSContext createContext(int i) {
        Utils.runtimeException(this::ensureInitialized);
        return new PulsarJMSContext(this, i);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            this.closed = true;
            if (this.initialized) {
                Iterator it = new ArrayList(this.connections).iterator();
                while (it.hasNext()) {
                    try {
                        ((PulsarConnection) it.next()).close();
                    } catch (Exception e) {
                        Utils.handleException(e);
                    }
                }
                Iterator<Producer<byte[]>> it2 = this.producers.values().iterator();
                while (it2.hasNext()) {
                    try {
                        it2.next().close();
                    } catch (PulsarClientException e2) {
                        Utils.handleException(e2);
                    }
                }
                this.pulsarAdmin.close();
                try {
                    this.pulsarClient.close();
                } catch (PulsarClientException e3) {
                    log.info("Error closing PulsarClient", e3);
                }
            }
        }
    }

    public static PulsarDestination toPulsarDestination(Destination destination) throws JMSException {
        if (destination instanceof PulsarDestination) {
            return (PulsarDestination) destination;
        }
        if (destination instanceof Queue) {
            return new PulsarQueue(((Queue) destination).getQueueName());
        }
        if (destination instanceof Topic) {
            return new PulsarTopic(((Topic) destination).getTopicName());
        }
        throw new IllegalStateException("Cannot convert " + destination + " to a PulsarDestination");
    }

    public String getPulsarTopicName(Destination destination) throws JMSException {
        return applySystemNamespace(toPulsarDestination(destination).getInternalTopicName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Producer<byte[]> getProducerForDestination(Destination destination, boolean z) throws JMSException {
        try {
            String pulsarTopicName = getPulsarTopicName(destination);
            return this.producers.computeIfAbsent(z ? pulsarTopicName + "-tx" : pulsarTopicName, str -> {
                try {
                    return (Producer) Utils.invoke(() -> {
                        Map<String, Object> producerConfiguration = getProducerConfiguration();
                        ProducerBuilder loadConf = this.pulsarClient.newProducer().topic(applySystemNamespace(pulsarTopicName)).loadConf(producerConfiguration);
                        if (producerConfiguration.containsKey("batcherBuilder")) {
                            loadConf.batcherBuilder((BatcherBuilder) producerConfiguration.get("batcherBuilder"));
                        }
                        if (z) {
                            loadConf.sendTimeout(0, TimeUnit.MILLISECONDS);
                        }
                        return loadConf.create();
                    });
                } catch (JMSException e) {
                    throw new RuntimeException((Throwable) e);
                }
            });
        } catch (RuntimeException e) {
            throw e.getCause();
        }
    }

    synchronized boolean isUsePulsarAdmin() {
        return this.usePulsarAdmin;
    }

    public void ensureQueueSubscription(PulsarDestination pulsarDestination) throws JMSException {
        long currentTimeMillis = System.currentTimeMillis();
        String pulsarTopicName = getPulsarTopicName(pulsarDestination);
        while (true) {
            try {
                if (isUsePulsarAdmin()) {
                    getPulsarAdmin().topics().createSubscription(pulsarTopicName, getQueueSubscriptionName(pulsarDestination), MessageId.earliest);
                } else {
                    getPulsarClient().newConsumer().subscriptionType(getTopicSharedSubscriptionType()).subscriptionName(getQueueSubscriptionName(pulsarDestination)).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(getPrecreateQueueSubscriptionConsumerQueueSize()).topic(new String[]{pulsarTopicName}).subscribe().close();
                }
                return;
            } catch (PulsarAdminException.ConflictException e) {
                log.debug("Subscription {} already exists for {}", getQueueSubscriptionName(pulsarDestination), pulsarTopicName);
                return;
            } catch (PulsarAdminException | PulsarClientException e2) {
                if (System.currentTimeMillis() - currentTimeMillis > getWaitForServerStartupTimeout()) {
                    throw Utils.handleException(e2);
                }
                log.info("Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting", e2.toString(), pulsarTopicName);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    throw Utils.handleException(e2);
                }
            }
        }
    }

    public Consumer<?> createConsumer(PulsarDestination pulsarDestination, String str, int i, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String str2, boolean z, String str3, AtomicReference<String> atomicReference) throws JMSException {
        String pulsarTopicName = getPulsarTopicName(pulsarDestination);
        String queueSubscriptionName = pulsarDestination.isQueue() ? getQueueSubscriptionName(pulsarDestination) : str;
        SubscriptionInitialPosition subscriptionInitialPosition = pulsarDestination.isTopic() ? SubscriptionInitialPosition.Latest : SubscriptionInitialPosition.Earliest;
        if (pulsarDestination.isQueue() && subscriptionMode != SubscriptionMode.Durable) {
            throw new IllegalStateException("only durable mode for queues");
        }
        if (pulsarDestination.isQueue() && subscriptionType == SubscriptionType.Exclusive) {
            throw new IllegalStateException("only Shared SubscriptionType for queues");
        }
        log.debug("createConsumer {} {} {} {}", new Object[]{pulsarTopicName, str, subscriptionMode, subscriptionType, str2});
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jms.destination.type", pulsarDestination.isQueue() ? "queue" : "topic");
        if (isUseServerSideFiltering()) {
            hashMap2.put("jms.filtering", "true");
            hashMap.put("jms.destination.type", pulsarDestination.isQueue() ? "queue" : "topic");
            if (z) {
                hashMap2.put("jms.filter.JMSConnectionID", str3);
            }
        }
        if (isUseServerSideFiltering()) {
            if (str2 != null) {
                hashMap2.put("jms.selector", str2);
            }
            if (pulsarDestination.isTopic()) {
                hashMap2.put("jms.selector.reject.action", "drop");
            } else {
                hashMap2.put("jms.selector.reject.action", "reschedule");
            }
        }
        if (isAcknowledgeRejectedMessages()) {
            hashMap2.put("jms.force.drop.rejected", "true");
        }
        try {
            if (isUseServerSideFiltering() && subscriptionMode == SubscriptionMode.Durable) {
                try {
                    SubscriptionStats subscriptionStats = (SubscriptionStats) this.pulsarAdmin.topics().getStats(pulsarTopicName).getSubscriptions().get(queueSubscriptionName);
                    if (subscriptionStats != null) {
                        Map subscriptionProperties = subscriptionStats.getSubscriptionProperties();
                        log.info("subscriptionPropertiesFromBroker {}", subscriptionProperties);
                        if (subscriptionProperties != null && "true".equals(subscriptionProperties.get("jms.filtering"))) {
                            String str4 = (String) subscriptionProperties.getOrDefault("jms.selector", "");
                            if (!str4.isEmpty()) {
                                log.info("Detected selector {} on Subscription {} on topic {}", new Object[]{str4, queueSubscriptionName, pulsarTopicName});
                                atomicReference.set(str4);
                            }
                        }
                    }
                } catch (PulsarAdminException.NotFoundException e) {
                }
            }
            Consumer<?> subscribe = this.pulsarClient.newConsumer(getConsumerSchema()).negativeAckRedeliveryDelay(1L, TimeUnit.SECONDS).loadConf(getConsumerConfiguration()).properties(hashMap2).subscriptionInitialPosition(subscriptionInitialPosition).subscriptionMode(subscriptionMode).subscriptionProperties(hashMap).subscriptionType(subscriptionType).subscriptionName(queueSubscriptionName).topic(new String[]{pulsarTopicName}).subscribe();
            this.consumers.add(subscribe);
            return subscribe;
        } catch (PulsarClientException | PulsarAdminException e2) {
            throw Utils.handleException(e2);
        }
    }

    public Reader<?> createReaderForBrowser(PulsarQueue pulsarQueue) throws JMSException {
        String pulsarTopicName = getPulsarTopicName(pulsarQueue);
        try {
            List peekMessages = getPulsarAdmin().topics().peekMessages(pulsarTopicName, getQueueSubscriptionName(pulsarQueue), 1);
            MessageId messageId = peekMessages.isEmpty() ? MessageId.latest : ((Message) peekMessages.get(0)).getMessageId();
            if (log.isDebugEnabled()) {
                log.debug("createBrowser {} at {}", pulsarTopicName, messageId);
            }
            Reader<?> create = this.pulsarClient.newReader(getConsumerSchema()).loadConf(getConsumerConfiguration()).readerName("jms-queue-browser-" + UUID.randomUUID()).startMessageId(messageId).startMessageIdInclusive().topic(pulsarTopicName).create();
            this.readers.add(create);
            return create;
        } catch (PulsarClientException | PulsarAdminException e) {
            throw Utils.handleException(e);
        }
    }

    public void removeConsumer(Consumer<?> consumer) {
        this.consumers.remove(consumer);
    }

    public void removeReader(Reader<?> reader) {
        this.readers.remove(reader);
    }

    public boolean deleteSubscription(PulsarDestination pulsarDestination, String str) throws JMSException {
        List<String> emptyList;
        String systemNamespace = getSystemNamespace();
        boolean z = false;
        if (pulsarDestination != null) {
            try {
                String pulsarTopicName = getPulsarTopicName(pulsarDestination);
                log.info("deleteSubscription topic {} name {}", pulsarTopicName, str);
                try {
                    this.pulsarAdmin.topics().deleteSubscription(pulsarTopicName, str, true);
                    z = true;
                } catch (PulsarAdminException.NotFoundException e) {
                    log.error("Cannot unsubscribe {} from {}: not found", str, pulsarTopicName);
                }
            } catch (Exception e2) {
                throw Utils.handleException(e2);
            }
        }
        if (!z) {
            for (String str2 : this.pulsarAdmin.topics().getList(systemNamespace)) {
                if (str2.endsWith(PENDING_ACK_STORE_SUFFIX)) {
                    log.info("Ignoring system topic {}", str2);
                } else {
                    log.info("Scanning topic {}", str2);
                    try {
                        emptyList = this.pulsarAdmin.topics().getSubscriptions(str2);
                        log.info("Subscriptions {}", emptyList);
                    } catch (PulsarAdminException.NotFoundException e3) {
                        log.error("Skipping topic {}", str2);
                        emptyList = Collections.emptyList();
                    }
                    for (String str3 : emptyList) {
                        log.info("Found subscription {} ", str3);
                        if (str3.equals(str)) {
                            log.info("deleteSubscription topic {} name {}", str2, str);
                            this.pulsarAdmin.topics().deleteSubscription(str2, str, true);
                            z = true;
                        }
                    }
                }
            }
        }
        return z;
    }

    public void registerClientId(String str) throws InvalidClientIDException {
        log.info("registerClientId {}, existing {}", str, clientIdentifiers);
        if (!clientIdentifiers.add(str)) {
            throw new InvalidClientIDException("A connection with this client id '" + str + "'is already opened locally");
        }
    }

    public void unregisterConnection(PulsarConnection pulsarConnection) {
        if (pulsarConnection.clientId != null) {
            clientIdentifiers.remove(pulsarConnection.clientId);
            log.info("unregisterClientId {} {}", pulsarConnection.clientId, clientIdentifiers);
        }
        this.connections.remove(pulsarConnection);
    }

    public QueueConnection createQueueConnection() throws JMSException {
        return m6createConnection();
    }

    public QueueConnection createQueueConnection(String str, String str2) throws JMSException {
        return m5createConnection(str, str2);
    }

    public TopicConnection createTopicConnection() throws JMSException {
        return m6createConnection();
    }

    public TopicConnection createTopicConnection(String str, String str2) throws JMSException {
        return m5createConnection(str, str2);
    }

    public synchronized boolean isForceDeleteTemporaryDestinations() {
        return this.forceDeleteTemporaryDestinations;
    }

    public String getQueueSubscriptionName(PulsarDestination pulsarDestination) {
        String str;
        String extractSubscriptionName = pulsarDestination.extractSubscriptionName();
        if (extractSubscriptionName != null) {
            return extractSubscriptionName;
        }
        synchronized (this) {
            str = this.queueSubscriptionName;
        }
        return str;
    }

    public synchronized long getWaitForServerStartupTimeout() {
        return this.waitForServerStartupTimeout;
    }

    public synchronized SubscriptionType getExclusiveSubscriptionTypeForSimpleConsumers(Destination destination) {
        return this.useExclusiveSubscriptionsForSimpleConsumers ? SubscriptionType.Exclusive : destination instanceof Queue ? SubscriptionType.Shared : getTopicSharedSubscriptionType();
    }

    public synchronized SubscriptionType getTopicSharedSubscriptionType() {
        return this.topicSharedSubscriptionType;
    }

    public String applySystemNamespace(String str) {
        if (str == null) {
            return null;
        }
        return (str.startsWith("persistent://") || str.startsWith("non-persistent://")) ? str : "persistent://" + getSystemNamespace() + "/" + str;
    }

    public boolean isAcknowledgeRejectedMessages() {
        return this.acknowledgeRejectedMessages;
    }

    public synchronized boolean isClosed() {
        return this.closed;
    }

    private synchronized int getPrecreateQueueSubscriptionConsumerQueueSize() {
        return this.precreateQueueSubscriptionConsumerQueueSize;
    }
}
