package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.Utils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarConnection.class */
public class PulsarConnection implements Connection, QueueConnection, TopicConnection {
    private static final Logger log = LoggerFactory.getLogger(PulsarConnection.class);
    private final PulsarConnectionFactory factory;
    private volatile ExceptionListener exceptionListener;
    private final String connectionId;
    String clientId;
    private final List<PulsarSession> sessions = new CopyOnWriteArrayList();
    private final List<PulsarTemporaryDestination> temporaryDestinations = new CopyOnWriteArrayList();
    private volatile boolean closed = false;
    private volatile boolean allowSetClientId = true;
    private final ReentrantReadWriteLock connectionPausedLock = new ReentrantReadWriteLock();
    private final Condition pausedCondition = this.connectionPausedLock.writeLock().newCondition();
    private volatile boolean paused = true;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarConnection$ConsumerBuilder.class */
    public interface ConsumerBuilder {
        MessageConsumer build(Session session) throws JMSException;
    }

    public PulsarConnection(PulsarConnectionFactory pulsarConnectionFactory) throws JMSException {
        this.factory = pulsarConnectionFactory;
        this.clientId = pulsarConnectionFactory.getDefaultClientId();
        if (this.clientId == null) {
            this.connectionId = UUID.randomUUID().toString();
        } else {
            pulsarConnectionFactory.registerClientId(this.clientId);
            this.connectionId = this.clientId + "_" + UUID.randomUUID().toString();
        }
    }

    public PulsarConnectionFactory getFactory() {
        return this.factory;
    }

    @Override // javax.jms.Connection
    public PulsarSession createSession(boolean z, int i) throws JMSException {
        return createSession(z, i, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession createSession(boolean z, int i, ConsumerConfiguration consumerConfiguration) throws JMSException {
        checkNotClosed();
        this.allowSetClientId = false;
        PulsarSession pulsarSession = new PulsarSession(z ? 0 : i, this, consumerConfiguration);
        this.sessions.add(pulsarSession);
        return pulsarSession;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkNotClosed() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("This connection is closed");
        }
    }

    @Override // javax.jms.Connection
    public PulsarSession createSession(int i) throws JMSException {
        return createSession(i == 0, i);
    }

    @Override // javax.jms.Connection
    public PulsarSession createSession() throws JMSException {
        this.allowSetClientId = false;
        return createSession(1);
    }

    @Override // javax.jms.Connection
    public String getClientID() throws JMSException {
        checkNotClosed();
        return this.clientId;
    }

    @Override // javax.jms.Connection
    public void setClientID(String str) throws JMSException {
        log.info("setClientID {} on {}, factory {}", new Object[]{str, this, this.factory});
        checkNotClosed();
        if (!this.allowSetClientId) {
            throw new IllegalStateException("Cannot set clientId after performing any other operation");
        }
        if (this.factory.getDefaultClientId() != null) {
            throw new IllegalStateException("ClientId has be administratively configured as " + this.factory.getDefaultClientId() + ", you cannot change it");
        }
        if (str == null || str.isEmpty()) {
            throw new InvalidClientIDException("Invalid empty clientId");
        }
        if (this.clientId != null) {
            throw new InvalidClientIDException("cannot set again the clientId");
        }
        this.allowSetClientId = false;
        this.factory.registerClientId(str);
        this.clientId = str;
    }

    @Override // javax.jms.Connection
    public ConnectionMetaData getMetaData() throws JMSException {
        checkNotClosed();
        return PulsarConnectionMetadata.INSTANCE;
    }

    @Override // javax.jms.Connection
    public ExceptionListener getExceptionListener() throws JMSException {
        checkNotClosed();
        return this.exceptionListener;
    }

    @Override // javax.jms.Connection
    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
        checkNotClosed();
        this.exceptionListener = exceptionListener;
    }

    @Override // javax.jms.Connection
    public void start() throws JMSException {
        JMSException handleException;
        checkNotInSessionMessageListener();
        checkNotClosed();
        this.connectionPausedLock.writeLock().lock();
        try {
            try {
                this.paused = false;
                this.pausedCondition.signalAll();
                this.connectionPausedLock.writeLock().unlock();
            } finally {
            }
        } catch (Throwable th) {
            this.connectionPausedLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // javax.jms.Connection
    public void stop() throws JMSException {
        JMSException handleException;
        checkNotInSessionMessageListener();
        checkNotClosed();
        this.connectionPausedLock.writeLock().lock();
        try {
            try {
                this.paused = true;
                this.pausedCondition.signalAll();
                this.connectionPausedLock.writeLock().unlock();
            } finally {
            }
        } catch (Throwable th) {
            this.connectionPausedLock.writeLock().unlock();
            throw th;
        }
    }

    private void checkNotInSessionMessageListener() throws JMSException {
        Iterator<PulsarSession> it = this.sessions.iterator();
        while (it.hasNext()) {
            Utils.checkNotOnMessageListener(it.next());
        }
    }

    @Override // javax.jms.Connection, java.lang.AutoCloseable
    public void close() throws JMSException {
        checkNotInSessionMessageListener();
        if (this.closed) {
            return;
        }
        this.closed = true;
        Iterator<PulsarSession> it = this.sessions.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.sessions.clear();
        Iterator it2 = new ArrayList(this.temporaryDestinations).iterator();
        while (it2.hasNext()) {
            PulsarTemporaryDestination pulsarTemporaryDestination = (PulsarTemporaryDestination) it2.next();
            try {
                pulsarTemporaryDestination.delete();
            } catch (JMSException e) {
                log.error("Cannot delete temporary destination {}", pulsarTemporaryDestination.topicName, e);
            }
        }
        this.temporaryDestinations.clear();
        this.factory.unregisterConnection(this);
    }

    @Override // javax.jms.Connection
    public ConnectionConsumer createConnectionConsumer(Destination destination, String str, ServerSessionPool serverSessionPool, int i) throws JMSException {
        checkNotClosed();
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createConsumer(destination, str);
        });
    }

    @Override // javax.jms.Connection
    public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String str, String str2, ServerSessionPool serverSessionPool, int i) throws JMSException {
        checkNotClosed();
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createSharedConsumer(topic, str, str2);
        });
    }

    @Override // javax.jms.Connection, javax.jms.TopicConnection
    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String str, String str2, ServerSessionPool serverSessionPool, int i) throws JMSException {
        checkNotClosed();
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createDurableConsumer(topic, str, str2, false);
        });
    }

    @Override // javax.jms.Connection
    public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String str, String str2, ServerSessionPool serverSessionPool, int i) throws JMSException {
        checkNotClosed();
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createSharedDurableConsumer(topic, str, str2);
        });
    }

    public void unregisterSession(PulsarSession pulsarSession) {
        this.sessions.remove(pulsarSession);
    }

    public String prependClientId(String str, boolean z) throws JMSException {
        if (this.clientId != null) {
            return this.clientId + "_" + str;
        }
        if (z) {
            return str;
        }
        throw new IllegalStateException("ClientID must be set");
    }

    public void setAllowSetClientId(boolean z) {
        this.allowSetClientId = z;
    }

    public <T> T executeInConnectionPausedLock(Utils.SupplierWithException<T> supplierWithException, int i) throws JMSException {
        boolean z = true;
        this.connectionPausedLock.readLock().lock();
        try {
            try {
                if (this.paused) {
                    this.connectionPausedLock.readLock().unlock();
                    this.connectionPausedLock.writeLock().lock();
                    while (this.paused) {
                        try {
                            if (i > 0) {
                                z = this.pausedCondition.await(i, TimeUnit.MILLISECONDS);
                                if (!z) {
                                    break;
                                }
                            } else {
                                this.pausedCondition.await();
                            }
                        } catch (Throwable th) {
                            this.connectionPausedLock.writeLock().unlock();
                            throw th;
                        }
                    }
                    this.connectionPausedLock.readLock().lock();
                    this.connectionPausedLock.writeLock().unlock();
                }
                if (!z) {
                    return null;
                }
                T run = supplierWithException.run();
                this.connectionPausedLock.readLock().unlock();
                return run;
            } catch (Throwable th2) {
                throw Utils.handleException(th2);
            }
        } finally {
            this.connectionPausedLock.readLock().unlock();
        }
    }

    public boolean isStarted() {
        this.connectionPausedLock.readLock().lock();
        try {
            return !this.paused;
        } finally {
            this.connectionPausedLock.readLock().unlock();
        }
    }

    public TemporaryQueue createTemporaryQueue(PulsarSession pulsarSession) throws JMSException {
        checkNotClosed();
        String str = "persistent://" + this.factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID();
        createPulsarTemporaryTopic(str);
        PulsarTemporaryQueue pulsarTemporaryQueue = new PulsarTemporaryQueue(str, pulsarSession);
        this.temporaryDestinations.add(pulsarTemporaryQueue);
        return pulsarTemporaryQueue;
    }

    public TemporaryTopic createTemporaryTopic(PulsarSession pulsarSession) throws JMSException {
        checkNotClosed();
        String str = "persistent://" + this.factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID();
        createPulsarTemporaryTopic(str);
        PulsarTemporaryTopic pulsarTemporaryTopic = new PulsarTemporaryTopic(str, pulsarSession);
        this.temporaryDestinations.add(pulsarTemporaryTopic);
        return pulsarTemporaryTopic;
    }

    @Override // javax.jms.QueueConnection
    public QueueSession createQueueSession(boolean z, int i) throws JMSException {
        return createSession(z, i).emulateLegacySession(true, false);
    }

    @Override // javax.jms.QueueConnection
    public ConnectionConsumer createConnectionConsumer(Queue queue, String str, ServerSessionPool serverSessionPool, int i) throws JMSException {
        checkNotClosed();
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createConsumer(queue, str);
        });
    }

    @Override // javax.jms.TopicConnection
    public TopicSession createTopicSession(boolean z, int i) throws JMSException {
        return createSession(z, i).emulateLegacySession(false, true);
    }

    @Override // javax.jms.TopicConnection
    public ConnectionConsumer createConnectionConsumer(Topic topic, String str, ServerSessionPool serverSessionPool, int i) throws JMSException {
        return buildConnectionConsumer(serverSessionPool, i, session -> {
            return session.createConsumer(topic, str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeTemporaryDestination(PulsarTemporaryDestination pulsarTemporaryDestination) {
        this.temporaryDestinations.remove(pulsarTemporaryDestination);
    }

    public String getConnectionId() {
        return this.connectionId;
    }

    private ConnectionConsumer buildConnectionConsumer(ServerSessionPool serverSessionPool, int i, ConsumerBuilder consumerBuilder) throws JMSException {
        PulsarSession createSession = createSession(false, 4);
        PulsarConnectionConsumer pulsarConnectionConsumer = new PulsarConnectionConsumer(createSession, (PulsarMessageConsumer) consumerBuilder.build(createSession), serverSessionPool, i);
        pulsarConnectionConsumer.start();
        return pulsarConnectionConsumer;
    }

    private void createPulsarTemporaryTopic(String str) throws JMSException {
        try {
            this.factory.getPulsarAdmin().topics().createNonPartitionedTopic(str);
        } catch (IllegalStateException e) {
            if (!this.factory.isAllowTemporaryTopicWithoutAdmin()) {
                throw Utils.handleException(e);
            }
            log.warn("Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true", str, e);
        } catch (Exception e2) {
            throw Utils.handleException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void refreshServerSideSelectors() {
        this.sessions.forEach(pulsarSession -> {
            pulsarSession.refreshServerSideSelectors();
        });
    }
}
