package org.apache.camel.component.sjms.batch;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer.class */
public class SjmsBatchConsumer extends DefaultConsumer {
    private static final boolean TRANSACTED = true;
    private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
    private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
    private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
    private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
    private final SjmsBatchEndpoint sjmsBatchEndpoint;
    private final AggregationStrategy aggregationStrategy;
    private final int completionSize;
    private final int completionTimeout;
    private final int consumerCount;
    private final int pollDuration;
    private final ConnectionFactory connectionFactory;
    private final String destinationName;
    private final Processor processor;
    private ExecutorService jmsConsumerExecutors;
    private final AtomicBoolean running;
    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef;
    private Connection connection;

    /* loaded from: input_file:org/apache/camel/component/sjms/batch/SjmsBatchConsumer$BatchConsumptionLoop.class */
    private class BatchConsumptionLoop implements Runnable {
        private BatchConsumptionLoop() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    Session createSession = SjmsBatchConsumer.this.connection.createSession(true, 2);
                    try {
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(SjmsBatchConsumer.this.destinationName));
                        try {
                            consumeBatchesOnLoop(createSession, createConsumer);
                            try {
                                createConsumer.close();
                            } catch (JMSException e) {
                                SjmsBatchConsumer.this.log.error("Exception caught closing consumer: {}", e.getMessage());
                            }
                            try {
                                createSession.close();
                            } catch (JMSException e2) {
                                SjmsBatchConsumer.this.log.error("Exception caught closing session: {}", e2.getMessage());
                            }
                            ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                        } catch (Throwable th) {
                            try {
                                createConsumer.close();
                            } catch (JMSException e3) {
                                SjmsBatchConsumer.this.log.error("Exception caught closing consumer: {}", e3.getMessage());
                            }
                            throw th;
                        }
                    } catch (Throwable th2) {
                        try {
                            createSession.close();
                        } catch (JMSException e4) {
                            SjmsBatchConsumer.this.log.error("Exception caught closing session: {}", e4.getMessage());
                        }
                        throw th2;
                    }
                } catch (JMSException e5) {
                    SjmsBatchConsumer.LOG.error("Exception caught consuming from {}: {}", SjmsBatchConsumer.this.destinationName, SjmsBatchConsumer.this.getStackTrace(e5));
                    ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                }
            } catch (Throwable th3) {
                ((CountDownLatch) SjmsBatchConsumer.this.consumersShutdownLatchRef.get()).countDown();
                throw th3;
            }
        }

        private void consumeBatchesOnLoop(Session session, MessageConsumer messageConsumer) throws JMSException {
            boolean z = SjmsBatchConsumer.this.completionTimeout > 0;
            while (SjmsBatchConsumer.this.running.get()) {
                int i = 0;
                long j = 0;
                long j2 = 0;
                Exchange exchange = null;
                while (true) {
                    if (SjmsBatchConsumer.this.completionSize <= 0 || i < SjmsBatchConsumer.this.completionSize) {
                        Message receive = messageConsumer.receive((!z || j <= 0) ? SjmsBatchConsumer.this.pollDuration : getReceiveWaitTime(j));
                        if (!SjmsBatchConsumer.this.running.get()) {
                            SjmsBatchConsumer.LOG.info("Shutdown signal received - rolling batch back");
                            session.rollback();
                            return;
                        }
                        if (receive == null) {
                            SjmsBatchConsumer.LOG.trace("No message received");
                        } else {
                            if (z && i == 0) {
                                j2 = new Date().getTime();
                            }
                            i += SjmsBatchConsumer.TRANSACTED;
                            SjmsBatchConsumer.LOG.debug("Message received: {}", Integer.valueOf(i));
                            if (!(receive instanceof ObjectMessage) && !(receive instanceof TextMessage)) {
                                throw new IllegalArgumentException("Unexpected message type: " + receive.getClass().toString());
                            }
                            exchange = SjmsBatchConsumer.this.aggregationStrategy.aggregate(exchange, SjmsBatchConsumer.this.m11getEndpoint().createExchange(receive, session));
                            exchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.valueOf(i));
                        }
                        if (z && j2 > 0) {
                            j = new Date().getTime() - j2;
                            if (j > SjmsBatchConsumer.this.completionTimeout) {
                                break;
                            }
                        }
                    }
                }
                process(exchange, session);
            }
        }

        private long getReceiveWaitTime(long j) {
            long timeRemaining = getTimeRemaining(j);
            if (timeRemaining <= 0) {
                timeRemaining = 1;
            }
            long j2 = timeRemaining > ((long) SjmsBatchConsumer.this.pollDuration) ? SjmsBatchConsumer.this.pollDuration : timeRemaining;
            SjmsBatchConsumer.LOG.debug("waiting for {}", Long.valueOf(j2));
            return j2;
        }

        private long getTimeRemaining(long j) {
            long j2 = SjmsBatchConsumer.this.completionTimeout - j;
            if (SjmsBatchConsumer.LOG.isDebugEnabled() && j > 0) {
                SjmsBatchConsumer.LOG.debug("Time remaining this batch: {}", Long.valueOf(j2));
            }
            return j2;
        }

        private void process(Exchange exchange, Session session) {
            int andIncrement = SjmsBatchConsumer.BATCH_COUNT.getAndIncrement();
            int intValue = ((Integer) exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class)).intValue();
            if (SjmsBatchConsumer.LOG.isDebugEnabled()) {
                SjmsBatchConsumer.LOG.debug("Processing batch[" + andIncrement + "]:size=" + intValue + ":total=" + SjmsBatchConsumer.MESSAGE_RECEIVED.addAndGet(intValue));
            }
            exchange.addOnCompletion(new SessionCompletion(session));
            try {
                SjmsBatchConsumer.this.processor.process(exchange);
                SjmsBatchConsumer.LOG.debug("Completed processing[{}]:total={}", Integer.valueOf(andIncrement), Long.valueOf(SjmsBatchConsumer.MESSAGE_PROCESSED.addAndGet(intValue)));
            } catch (Exception e) {
                SjmsBatchConsumer.LOG.error("Error processing exchange: {}", e.getMessage());
            }
        }
    }

    public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
        super(sjmsBatchEndpoint, processor);
        this.running = new AtomicBoolean(true);
        this.consumersShutdownLatchRef = new AtomicReference<>();
        this.sjmsBatchEndpoint = (SjmsBatchEndpoint) ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
        this.processor = (Processor) ObjectHelper.notNull(processor, "processor");
        this.destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
        this.completionSize = sjmsBatchEndpoint.getCompletionSize();
        this.completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
        this.pollDuration = sjmsBatchEndpoint.getPollDuration();
        if (this.pollDuration < 0) {
            throw new IllegalArgumentException("pollDuration must be 0 or greater");
        }
        this.aggregationStrategy = (AggregationStrategy) ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
        this.consumerCount = sjmsBatchEndpoint.getConsumerCount();
        if (this.consumerCount <= 0) {
            throw new IllegalArgumentException("consumerCount must be greater than 0");
        }
        this.connectionFactory = (ConnectionFactory) ObjectHelper.notNull(sjmsBatchEndpoint.getComponent().getConnectionFactory(), "jmsBatchComponent.connectionFactory");
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public SjmsBatchEndpoint m11getEndpoint() {
        return this.sjmsBatchEndpoint;
    }

    protected void doStart() throws Exception {
        super.doStart();
        try {
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            if (LOG.isInfoEnabled()) {
                LOG.info("Starting " + this.consumerCount + " consumer(s) for " + this.destinationName + ":" + this.completionSize);
            }
            this.consumersShutdownLatchRef.set(new CountDownLatch(this.consumerCount));
            this.jmsConsumerExecutors = m11getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "SjmsBatchConsumer", this.consumerCount);
            for (int i = 0; i < this.consumerCount; i += TRANSACTED) {
                this.jmsConsumerExecutors.execute(new BatchConsumptionLoop());
            }
        } catch (JMSException e) {
            LOG.error("Exception caught closing connection: {}", getStackTrace(e));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.running.set(false);
        CountDownLatch countDownLatch = this.consumersShutdownLatchRef.get();
        if (countDownLatch != null) {
            LOG.info("Stop signalled, waiting on consumers to shut down");
            if (countDownLatch.await(60L, TimeUnit.SECONDS)) {
                LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
            } else {
                LOG.info("All consumers have shut down");
            }
        } else {
            LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
        }
        try {
            LOG.debug("Shutting down JMS connection");
            this.connection.close();
        } catch (JMSException e) {
            LOG.error("Exception caught closing connection: {}", getStackTrace(e));
        }
        m11getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(this.jmsConsumerExecutors);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getStackTrace(Exception exc) {
        StringWriter stringWriter = new StringWriter();
        exc.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }
}
