/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.PulsarMessageConsumer;
import com.datastax.oss.pulsar.jms.PulsarSession;
import com.datastax.oss.pulsar.jms.Utils;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PulsarConnectionConsumer
implements ConnectionConsumer {
    private static final Logger log = LoggerFactory.getLogger(PulsarConnectionConsumer.class);
    private static final long TIMEOUT_RECEIVE = 1000L;
    private final PulsarMessageConsumer consumer;
    private final PulsarSession dispatcherSession;
    private final ServerSessionPool serverSessionPool;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Thread spool;
    private final int maxMessages;
    private final boolean maxMessagesLimitParallelism;
    private final int closeTimeout;
    private final Semaphore permits;

    PulsarConnectionConsumer(PulsarSession dispatcherSession, PulsarMessageConsumer consumer, ServerSessionPool serverSessionPool, int maxMessages) {
        this.dispatcherSession = dispatcherSession;
        this.consumer = consumer;
        this.serverSessionPool = serverSessionPool;
        this.maxMessages = maxMessages;
        this.maxMessagesLimitParallelism = dispatcherSession.getConnection().getFactory().isMaxMessagesLimitsParallelism();
        this.closeTimeout = dispatcherSession.getConnection().getFactory().getConnectionConsumerStopTimeout();
        this.permits = this.maxMessagesLimitParallelism ? new Semaphore(maxMessages) : null;
        this.spool = new Thread(new Spool());
        this.spool.setDaemon(true);
        this.spool.setName("jms-connection-consumer-" + serverSessionPool + "-" + consumer.getDestination().getName());
    }

    public void start() {
        this.spool.start();
    }

    @Override
    public ServerSessionPool getServerSessionPool() throws JMSException {
        if (this.closed.get()) {
            throw new IllegalStateException("The Connection Consumer is closed");
        }
        return this.serverSessionPool;
    }

    @Override
    public void close() throws JMSException {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        try {
            if (this.closeTimeout > 0) {
                this.spool.join(this.closeTimeout);
                if (this.spool.isAlive()) {
                    log.warn("Couldn't stop the ConnectionConsumer on {} within the configured jms.connectionConsumerStopTimeout={}", (Object)this.consumer.getDestination(), (Object)this.closeTimeout);
                    this.spool.interrupt();
                }
            } else {
                this.spool.join();
            }
        }
        catch (InterruptedException err) {
            Utils.handleException(err);
        }
        this.consumer.close();
        this.dispatcherSession.close();
    }

    boolean isSpoolThreadAlive() {
        return this.spool.isAlive();
    }

    private class Spool
    implements Runnable {
        private Spool() {
        }

        @Override
        public void run() {
            while (!PulsarConnectionConsumer.this.closed.get()) {
                try {
                    ServerSession serverSession;
                    if (PulsarConnectionConsumer.this.permits != null) {
                        try {
                            PulsarConnectionConsumer.this.permits.acquire();
                        }
                        catch (InterruptedException exit) {
                            return;
                        }
                    }
                    try {
                        serverSession = PulsarConnectionConsumer.this.serverSessionPool.getServerSession();
                    }
                    catch (JMSException internalContainerError) {
                        log.error("Container error", internalContainerError);
                        break;
                    }
                    int maxMessagesToConsume = PulsarConnectionConsumer.this.maxMessagesLimitParallelism ? 1 : PulsarConnectionConsumer.this.maxMessages;
                    List<Message> messages = PulsarConnectionConsumer.this.consumer.batchReceive(maxMessagesToConsume, 1000L);
                    PulsarSession wrappedByServerSideSession = (PulsarSession)serverSession.getSession();
                    Runnable postExecutionTask = null;
                    if (PulsarConnectionConsumer.this.permits != null) {
                        postExecutionTask = () -> PulsarConnectionConsumer.this.permits.release();
                    }
                    wrappedByServerSideSession.setupConnectionConsumerTask(messages, postExecutionTask);
                    serverSession.start();
                }
                catch (JMSException error) {
                    log.error("internal error", error);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException stopThread) {
                        break;
                    }
                }
            }
        }
    }
}

