package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.messages.PulsarBytesMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarMapMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarObjectMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarSimpleMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarStreamMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession.class */
public class PulsarSession implements Session, QueueSession, TopicSession {
    private static final Logger log = LoggerFactory.getLogger(PulsarSession.class);
    private final PulsarConnection connection;
    private final int sessionMode;
    private final boolean transacted;
    Transaction transaction;
    private MessageListener messageListener;
    private volatile boolean closed;
    private volatile ListenerThread listenerThread;
    private boolean allowQueueOperations = true;
    private boolean allowTopicOperations = true;
    private final Map<PulsarDestination, Producer<byte[]>> producers = new HashMap();
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final List<PulsarMessage> unackedMessages = new ArrayList();
    private final Map<String, PulsarDestination> destinationBySubscription = new HashMap();
    private final List<PulsarMessageConsumer> consumers = new CopyOnWriteArrayList();
    private final List<PulsarQueueBrowser> browsers = new CopyOnWriteArrayList();
    private boolean jms20 = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession$BlockCLoseOperation.class */
    public interface BlockCLoseOperation<T> {
        T execute() throws JMSException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/oss/pulsar/jms/PulsarSession$ListenerThread.class */
    public class ListenerThread extends Thread {
        private ListenerThread() {
            super("jms-session-thread");
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!PulsarSession.this.closed) {
                PulsarSession.this.run();
            }
        }
    }

    public PulsarSession(int i, PulsarConnection pulsarConnection) throws JMSException {
        this.connection = pulsarConnection;
        this.sessionMode = i;
        this.transacted = i == 0;
        validateSessionMode(i);
        if (i == 0 && !pulsarConnection.getFactory().isEnableTransaction()) {
            throw new JMSException("Please enable transactions on PulsarConnectionFactory with enableTransaction=true");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Transaction getTransaction() throws JMSException {
        if (this.transaction == null && this.sessionMode == 0) {
            this.transaction = startTransaction(this.connection);
        }
        return this.transaction;
    }

    private Transaction startTransaction(PulsarConnection pulsarConnection) throws JMSException {
        Transaction transaction = null;
        int i = 10;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                break;
            }
            try {
                try {
                    transaction = (Transaction) pulsarConnection.getFactory().getPulsarClient().newTransaction().build().get();
                    break;
                } catch (ExecutionException e) {
                    if (!(e.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException)) {
                        throw Utils.handleException(e.getCause());
                    }
                    log.info("Transaction service not available {}", e.getCause().getMessage());
                    Thread.sleep(1000L);
                }
            } catch (Exception e2) {
                throw Utils.handleException(e2);
            }
        }
        if (transaction == null) {
            throw new JMSException("Cannot create a Transaction in time");
        }
        return transaction;
    }

    private static void validateSessionMode(int i) throws JMSException {
        switch (i) {
            case 0:
            case 1:
            case 2:
            case 3:
            case PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE /* 4 */:
                return;
            default:
                throw new JMSException("Invalid sessionMode " + i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarConnectionFactory getFactory() {
        return this.connection.getFactory();
    }

    /* renamed from: createBytesMessage, reason: merged with bridge method [inline-methods] */
    public PulsarBytesMessage m25createBytesMessage() throws JMSException {
        checkNotClosed();
        return new PulsarBytesMessage();
    }

    public MapMessage createMapMessage() throws JMSException {
        checkNotClosed();
        return new PulsarMapMessage();
    }

    public MapMessage createMapMessage(Map<String, Object> map) throws JMSException {
        return new PulsarMapMessage(map);
    }

    public Message createMessage() throws JMSException {
        checkNotClosed();
        return new PulsarSimpleMessage();
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        checkNotClosed();
        return new PulsarObjectMessage();
    }

    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
        checkNotClosed();
        PulsarObjectMessage pulsarObjectMessage = new PulsarObjectMessage();
        pulsarObjectMessage.setObject(serializable);
        return pulsarObjectMessage;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        checkNotClosed();
        return new PulsarStreamMessage();
    }

    public TextMessage createTextMessage() throws JMSException {
        checkNotClosed();
        return new PulsarTextMessage((String) null);
    }

    public TextMessage createTextMessage(String str) throws JMSException {
        checkNotClosed();
        return new PulsarTextMessage(str);
    }

    public boolean getTransacted() throws JMSException {
        checkNotClosed();
        return this.sessionMode == 0;
    }

    public int getAcknowledgeMode() throws JMSException {
        checkNotClosed();
        return this.sessionMode;
    }

    public void commit() throws JMSException {
        checkNotClosed();
        Utils.checkNotOnMessageListener(this);
        Utils.checkNotOnMessageProducer(this, null);
        if (!this.transacted) {
            throw new IllegalStateException("session is not transacted");
        }
        this.closeLock.readLock().lock();
        try {
            if (this.transaction != null) {
                ArrayList arrayList = new ArrayList();
                Iterator<PulsarMessage> it = this.unackedMessages.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().acknowledgeInternalInTransaction(this.transaction));
                }
                arrayList.add(this.transaction.commit());
                Utils.get(CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])));
                this.unackedMessages.clear();
                this.transaction = null;
            }
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    public void rollback() throws JMSException {
        checkNotClosed();
        Utils.checkNotOnMessageListener(this);
        Utils.checkNotOnMessageProducer(this, null);
        this.closeLock.readLock().lock();
        try {
            if (!this.transacted) {
                throw new IllegalStateException("session is not transacted");
            }
            rollbackInternal();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    private void rollbackInternal() throws JMSException {
        Iterator<PulsarMessageConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().redeliverUnacknowledgedMessages();
        }
        this.unackedMessages.clear();
        if (this.transaction != null) {
            Utils.get(this.transaction.abort());
        }
        this.transaction = null;
    }

    public void close() throws JMSException {
        Utils.checkNotOnSessionCallback(this);
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.transacted && this.transaction != null) {
                rollbackInternal();
            }
            this.unackedMessages.clear();
            Iterator<PulsarMessageConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().closeInternal();
            }
            Iterator<PulsarQueueBrowser> it2 = this.browsers.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            this.consumers.clear();
            this.browsers.clear();
            if (this.listenerThread != null) {
                try {
                    this.listenerThread.join();
                    this.listenerThread = null;
                } catch (InterruptedException e) {
                    this.listenerThread = null;
                } catch (Throwable th) {
                    this.listenerThread = null;
                    throw th;
                }
            }
        } finally {
            this.closeLock.writeLock().unlock();
            this.connection.unregisterSession(this);
        }
    }

    public void recover() throws JMSException {
        checkNotClosed();
        if (this.transacted) {
            throw new IllegalStateException("cannot call this method inside a transacted session");
        }
        log.info("recover, unacked messages {}", this.unackedMessages);
        for (PulsarMessage pulsarMessage : this.unackedMessages) {
            log.info("recovering message {}", pulsarMessage);
            pulsarMessage.negativeAck();
        }
        this.unackedMessages.clear();
    }

    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    public void setMessageListener(MessageListener messageListener) throws JMSException {
        Objects.requireNonNull(messageListener);
        this.messageListener = messageListener;
    }

    public void run() {
        if (this.consumers.isEmpty() || !this.connection.isStarted()) {
            try {
                Thread.sleep(100L);
                return;
            } catch (InterruptedException e) {
                return;
            }
        }
        for (PulsarMessageConsumer pulsarMessageConsumer : this.consumers) {
            try {
                this.connection.executeInConnectionPausedLock(() -> {
                    pulsarMessageConsumer.runListener(100);
                    return null;
                }, 0);
            } catch (Throwable th) {
                log.error("Error in Session Thread {}", this, th);
            }
            if (!this.connection.isStarted()) {
                return;
            }
        }
    }

    /* renamed from: createProducer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageProducer m24createProducer(Destination destination) throws JMSException {
        this.connection.setAllowSetClientId(false);
        return new PulsarMessageProducer(this, destination);
    }

    /* renamed from: createConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m23createConsumer(Destination destination) throws JMSException {
        return m22createConsumer(destination, (String) null);
    }

    /* renamed from: createConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m22createConsumer(Destination destination, String str) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return m21createConsumer(destination, str, false);
    }

    /* renamed from: createConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m21createConsumer(Destination destination, String str, boolean z) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return new PulsarMessageConsumer(UUID.randomUUID().toString(), PulsarConnectionFactory.toPulsarDestination(destination), this, SubscriptionMode.NonDurable, getFactory().getExclusiveSubscriptionTypeForSimpleConsumers(destination), str, false, z).subscribe();
    }

    /* renamed from: createSharedConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m20createSharedConsumer(Topic topic, String str) throws JMSException {
        return m19createSharedConsumer(topic, str, (String) null);
    }

    /* renamed from: createSharedConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m19createSharedConsumer(Topic topic, String str, String str2) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        checkTopicOperationEnabled();
        String prependClientId = this.connection.prependClientId(str, true);
        PulsarDestination pulsarDestination = PulsarConnectionFactory.toPulsarDestination(topic);
        registerSubscriptionName(pulsarDestination, prependClientId, true);
        return new PulsarMessageConsumer(prependClientId, pulsarDestination, this, SubscriptionMode.NonDurable, getFactory().getTopicSharedSubscriptionType(), str2, true, false).subscribe();
    }

    /* renamed from: createQueue, reason: merged with bridge method [inline-methods] */
    public PulsarQueue m18createQueue(String str) throws JMSException {
        checkNotClosed();
        checkQueueOperationEnabled();
        return new PulsarQueue(str);
    }

    /* renamed from: createTopic, reason: merged with bridge method [inline-methods] */
    public PulsarTopic m17createTopic(String str) throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        return new PulsarTopic(str);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String str) throws JMSException {
        return m16createDurableSubscriber(topic, str, (String) null, false);
    }

    /* renamed from: createDurableSubscriber, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m16createDurableSubscriber(Topic topic, String str, String str2, boolean z) throws JMSException {
        return createDurableSubscriber(topic, str, str2, z, false);
    }

    private PulsarMessageConsumer createDurableSubscriber(Topic topic, String str, String str2, boolean z, boolean z2) throws JMSException {
        checkTopicOperationEnabled();
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        PulsarTopic pulsarTopic = (PulsarTopic) PulsarConnectionFactory.toPulsarDestination(topic);
        String prependClientId = this.connection.prependClientId(str, z2);
        registerSubscriptionName(pulsarTopic, prependClientId, false);
        return new PulsarMessageConsumer(prependClientId, pulsarTopic, this, SubscriptionMode.Durable, SubscriptionType.Exclusive, str2, true, z).subscribe();
    }

    private void registerSubscriptionName(PulsarDestination pulsarDestination, String str, boolean z) throws JMSException {
        PulsarDestination put = this.destinationBySubscription.put(str, pulsarDestination);
        if (put != null && put.equals(pulsarDestination) && !z) {
            throw new IllegalStateException("a subscription with name " + str + " is already registered on this session");
        }
    }

    private void unregisterSubscriptionName(String str, Topic topic) {
        PulsarDestination pulsarDestination = this.destinationBySubscription.get(str);
        if (pulsarDestination == null || !pulsarDestination.equals(topic)) {
            return;
        }
        this.destinationBySubscription.remove(str);
    }

    /* renamed from: createDurableConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m15createDurableConsumer(Topic topic, String str) throws JMSException {
        return m14createDurableConsumer(topic, str, (String) null, false);
    }

    /* renamed from: createDurableConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m14createDurableConsumer(Topic topic, String str, String str2, boolean z) throws JMSException {
        return createDurableSubscriber(topic, str, str2, z, false);
    }

    public MessageConsumer createSharedDurableConsumer(Topic topic, String str) throws JMSException {
        return m13createSharedDurableConsumer(topic, str, (String) null);
    }

    /* renamed from: createSharedDurableConsumer, reason: merged with bridge method [inline-methods] */
    public PulsarMessageConsumer m13createSharedDurableConsumer(Topic topic, String str, String str2) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        checkTopicOperationEnabled();
        PulsarTopic pulsarTopic = (PulsarTopic) PulsarConnectionFactory.toPulsarDestination(topic);
        String prependClientId = this.connection.prependClientId(str, true);
        registerSubscriptionName(pulsarTopic, prependClientId, true);
        return new PulsarMessageConsumer(prependClientId, pulsarTopic, this, SubscriptionMode.Durable, getFactory().getTopicSharedSubscriptionType(), str2, true, false).subscribe();
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        return createBrowser(queue, null);
    }

    public QueueBrowser createBrowser(Queue queue, String str) throws JMSException {
        if (queue == null) {
            throw new InvalidDestinationException("invalid null queue");
        }
        checkQueueOperationEnabled();
        PulsarQueueBrowser pulsarQueueBrowser = new PulsarQueueBrowser(this, queue, str);
        this.browsers.add(pulsarQueueBrowser);
        return pulsarQueueBrowser;
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        checkNotClosed();
        checkQueueOperationEnabled();
        return this.connection.createTemporaryQueue(this);
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        return this.connection.createTemporaryTopic(this);
    }

    public void unsubscribe(String str) throws JMSException {
        checkNotClosed();
        checkTopicOperationEnabled();
        String prependClientId = this.connection.prependClientId(str, true);
        PulsarDestination remove = this.destinationBySubscription.remove(prependClientId);
        if (remove == null) {
            log.error("Cannot unsubscribe {}, please open and close the subscription within this session before unsubscribing, because in Pulsar you need to known the Destination for the subscription. Known subscription names {}", prependClientId, this.destinationBySubscription);
        }
        if (!getFactory().deleteSubscription(remove, prependClientId)) {
            throw new InvalidDestinationException("Subscription " + prependClientId + " not found");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeAllMessages() throws JMSException {
        checkNotClosed();
        Iterator it = new ArrayList(this.unackedMessages).iterator();
        while (it.hasNext()) {
            ((PulsarMessage) it.next()).acknowledgeInternal();
        }
        this.unackedMessages.clear();
    }

    public void registerUnacknowledgedMessage(PulsarMessage pulsarMessage) {
        this.unackedMessages.add(pulsarMessage);
    }

    public void unregisterUnacknowledgedMessage(PulsarMessage pulsarMessage) {
        this.unackedMessages.remove(pulsarMessage);
    }

    public void removeConsumer(PulsarMessageConsumer pulsarMessageConsumer) {
        Consumer<byte[]> internalConsumer = pulsarMessageConsumer.getInternalConsumer();
        if (internalConsumer != null) {
            this.consumers.remove(pulsarMessageConsumer);
            getFactory().removeConsumer(internalConsumer);
            Iterator<PulsarMessage> it = this.unackedMessages.iterator();
            while (it.hasNext()) {
                PulsarMessage next = it.next();
                if (next != null && next.isReceivedFromConsumer(pulsarMessageConsumer)) {
                    it.remove();
                }
            }
        }
        if (pulsarMessageConsumer.unregisterSubscriptionOnClose) {
            unregisterSubscriptionName(pulsarMessageConsumer.subscriptionName, (Topic) pulsarMessageConsumer.getDestination());
        }
    }

    public void onError(Throwable th) {
        log.error("Internal error ", th);
    }

    public void registerConsumer(PulsarMessageConsumer pulsarMessageConsumer) {
        this.consumers.add(pulsarMessageConsumer);
        this.connection.setAllowSetClientId(false);
    }

    public boolean isJms20() {
        return this.jms20;
    }

    public void setJms20(boolean z) {
        this.jms20 = z;
    }

    public PulsarConnection getConnection() {
        return this.connection;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeBrowser(PulsarQueueBrowser pulsarQueueBrowser) {
        this.browsers.remove(pulsarQueueBrowser);
    }

    public boolean isTransactionStarted() {
        return this.transaction != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> T executeOperationIfConnectionStarted(BlockCLoseOperation<T> blockCLoseOperation, int i) throws JMSException {
        checkNotClosed();
        PulsarConnection pulsarConnection = this.connection;
        blockCLoseOperation.getClass();
        return (T) pulsarConnection.executeInConnectionPausedLock(blockCLoseOperation::execute, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> T executeCriticalOperation(BlockCLoseOperation<T> blockCLoseOperation) throws JMSException {
        checkNotClosed();
        this.closeLock.readLock().lock();
        try {
            checkNotClosed();
            return blockCLoseOperation.execute();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return m23createConsumer((Destination) queue);
    }

    public QueueReceiver createReceiver(Queue queue, String str) throws JMSException {
        return m22createConsumer((Destination) queue, str);
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        return m24createProducer((Destination) queue);
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return m23createConsumer((Destination) topic);
    }

    public TopicSubscriber createSubscriber(Topic topic, String str, boolean z) throws JMSException {
        return m21createConsumer((Destination) topic, str, z);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        return m24createProducer((Destination) topic);
    }

    public void checkNotClosed() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        this.connection.checkNotClosed();
    }

    public boolean isClosed() {
        this.closeLock.readLock().lock();
        try {
            return this.closed;
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureListenerThread() {
        if (this.listenerThread == null) {
            this.listenerThread = new ListenerThread();
            this.listenerThread.start();
        }
    }

    void checkQueueOperationEnabled() throws JMSException {
        if (!this.allowQueueOperations) {
            throw new IllegalStateException("This is not a QueueSession");
        }
    }

    void checkTopicOperationEnabled() throws JMSException {
        if (!this.allowTopicOperations) {
            throw new IllegalStateException("This is not a TopicSession");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSession emulateLegacySession(boolean z, boolean z2) {
        this.allowQueueOperations = z;
        this.allowTopicOperations = z2;
        return this;
    }
}
