package org.apache.james.mailetcontainer.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/james-server-mailetcontainer-camel-3.3.0.jar:org/apache/james/mailetcontainer/impl/JamesMailSpooler.class */
public class JamesMailSpooler implements Runnable, Disposable, Configurable, MailSpoolerMBean {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) JamesMailSpooler.class);
    public static final String SPOOL_PROCESSING = "spoolProcessing";
    private MailQueue queue;
    private int numThreads;
    private final AtomicInteger numActive = new AtomicInteger(0);
    private final AtomicInteger processingActive = new AtomicInteger(0);
    private final AtomicBoolean active = new AtomicBoolean(false);
    private final MetricFactory metricFactory;
    private ExecutorService dequeueService;
    private ExecutorService workerService;
    private MailProcessor mailProcessor;
    private MailQueueFactory<?> queueFactory;
    private int numDequeueThreads;

    @Inject
    public JamesMailSpooler(MetricFactory metricFactory) {
        this.metricFactory = metricFactory;
    }

    @Inject
    public void setMailQueueFactory(MailQueueFactory<?> mailQueueFactory) {
        this.queueFactory = mailQueueFactory;
    }

    @Inject
    public void setMailProcessor(MailProcessor mailProcessor) {
        this.mailProcessor = mailProcessor;
    }

    @Override // org.apache.james.lifecycle.api.Configurable
    public void configure(HierarchicalConfiguration hierarchicalConfiguration) throws ConfigurationException {
        this.numDequeueThreads = hierarchicalConfiguration.getInt("dequeueThreads", 2);
        this.numThreads = hierarchicalConfiguration.getInt("threads", 100);
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [org.apache.james.queue.api.MailQueue] */
    @PostConstruct
    public void init() {
        LOGGER.info("{} init...", getClass().getName());
        this.queue = this.queueFactory.createQueue(MailQueueFactory.SPOOL);
        LOGGER.info("{} uses {} Thread(s)", getClass().getName(), Integer.valueOf(this.numThreads));
        this.active.set(true);
        this.workerService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "spooler", this.numThreads);
        this.dequeueService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "dequeuer", this.numDequeueThreads);
        for (int i = 0; i < this.numDequeueThreads; i++) {
            this.dequeueService.execute(new Thread(this, "Dequeue Thread #" + i));
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("Run {}: {}", getClass().getName(), Thread.currentThread().getName());
        LOGGER.info("Queue={}", this.queue);
        while (this.active.get()) {
            try {
                MailQueue.MailQueueItem deQueue = this.queue.deQueue();
                this.workerService.execute(() -> {
                    TimeMetric timer = this.metricFactory.timer(SPOOL_PROCESSING);
                    try {
                        try {
                            this.numActive.incrementAndGet();
                            this.processingActive.incrementAndGet();
                            Mail mail = deQueue.getMail();
                            LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
                            try {
                                try {
                                    this.mailProcessor.service(mail);
                                    deQueue.done(true);
                                    LifecycleUtil.dispose(mail);
                                } catch (Exception e) {
                                    if (this.active.get()) {
                                        LOGGER.error("Exception processing mail while spooling", (Throwable) e);
                                    }
                                    deQueue.done(false);
                                    LifecycleUtil.dispose(mail);
                                }
                                this.processingActive.decrementAndGet();
                                this.numActive.decrementAndGet();
                                timer.stopAndPublish();
                            } catch (Throwable th) {
                                LifecycleUtil.dispose(mail);
                                throw th;
                            }
                        } catch (Throwable th2) {
                            if (this.active.get()) {
                                LOGGER.error("Exception processing mail while spooling", th2);
                            }
                            this.processingActive.decrementAndGet();
                            this.numActive.decrementAndGet();
                            timer.stopAndPublish();
                        }
                    } catch (Throwable th3) {
                        this.processingActive.decrementAndGet();
                        this.numActive.decrementAndGet();
                        timer.stopAndPublish();
                        throw th3;
                    }
                });
            } catch (InterruptedException e) {
            } catch (MailQueue.MailQueueException e2) {
                if (this.active.get()) {
                    LOGGER.error("Exception dequeue mail", (Throwable) e2);
                }
            }
        }
        LOGGER.info("Stop {} : {}", getClass().getName(), Thread.currentThread().getName());
    }

    @Override // org.apache.james.lifecycle.api.Disposable
    @PreDestroy
    public void dispose() {
        LOGGER.info("{} dispose...", getClass().getName());
        this.active.set(false);
        this.dequeueService.shutdownNow();
        this.workerService.shutdown();
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (this.numActive.get() != 0 && currentTimeMillis > System.currentTimeMillis()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        LOGGER.info("{} thread shutdown completed.", getClass().getName());
    }

    @Override // org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean
    public int getThreadCount() {
        return this.numThreads;
    }

    @Override // org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean
    public int getCurrentSpoolCount() {
        return this.processingActive.get();
    }
}
