package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.Concurrency;
import com.github.sonus21.rqueue.models.enums.PriorityMode;
import com.github.sonus21.rqueue.models.enums.RqueueMode;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.StringUtils;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import com.github.sonus21.rqueue.web.dao.RqueueSystemConfigDao;
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.class */
public class RqueueMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RqueueMessageListenerContainer.class);
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class);
    private final RqueueMessageTemplate rqueueMessageTemplate;
    private final RqueueMessageHandler rqueueMessageHandler;
    private MessageProcessor discardMessageProcessor;
    private MessageProcessor deadLetterQueueMessageProcessor;
    private MessageProcessor manualDeletionMessageProcessor;
    private MessageProcessor postExecutionMessageProcessor;
    private MessageProcessor preExecutionMessageProcessor;
    private PostProcessingHandler postProcessingHandler;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Autowired
    private RqueueWebConfig rqueueWebConfig;

    @Autowired
    private RqueueConfig rqueueConfig;

    @Autowired(required = false)
    private RqueueMetricsCounter rqueueMetricsCounter;

    @Autowired
    private RqueueMessageMetadataService rqueueMessageMetadataService;

    @Autowired
    private RqueueSystemConfigDao rqueueSystemConfigDao;
    private AsyncTaskExecutor taskExecutor;
    private Integer maxNumWorkers;
    private String beanName;
    private PriorityMode priorityMode;
    private final Object lifecycleMgr = new Object();
    private TaskExecutionBackOff taskExecutionBackOff = new FixedTaskExecutionBackOff();
    private Map<String, ThreadUtils.QueueThread> queueThreadMap = new ConcurrentHashMap();
    private Map<String, Boolean> queueRunningState = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue = new ConcurrentHashMap<>();
    private boolean defaultTaskExecutor = false;
    private boolean autoStartup = true;
    private boolean running = false;
    private long backOffTime = 5000;
    private long maxWorkerWaitTime = 20000;
    private long pollingInterval = 200;
    private int phase = Integer.MAX_VALUE;

    public RqueueMessageListenerContainer(RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
        Assert.notNull(rqueueMessageHandler, "rqueueMessageHandler cannot be null");
        Assert.notNull(rqueueMessageTemplate, "rqueueMessageTemplate cannot be null");
        this.rqueueMessageHandler = rqueueMessageHandler;
        this.rqueueMessageTemplate = rqueueMessageTemplate;
        this.discardMessageProcessor = new MessageProcessor() { // from class: com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.1
        };
        this.deadLetterQueueMessageProcessor = this.discardMessageProcessor;
        this.manualDeletionMessageProcessor = this.discardMessageProcessor;
        this.postExecutionMessageProcessor = this.discardMessageProcessor;
        this.preExecutionMessageProcessor = this.discardMessageProcessor;
    }

    public RqueueMessageTemplate getRqueueMessageTemplate() {
        return this.rqueueMessageTemplate;
    }

    public long getMaxWorkerWaitTime() {
        return this.maxWorkerWaitTime;
    }

    public void setMaxWorkerWaitTime(long j) {
        this.maxWorkerWaitTime = j;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public RqueueMessageHandler getRqueueMessageHandler() {
        return this.rqueueMessageHandler;
    }

    public Integer getMaxNumWorkers() {
        return this.maxNumWorkers;
    }

    public void setMaxNumWorkers(int i) {
        this.maxNumWorkers = Integer.valueOf(i);
    }

    public long getBackOffTime() {
        return this.backOffTime;
    }

    public void setBackOffTime(long j) {
        this.backOffTime = j;
    }

    public void destroy() throws Exception {
        synchronized (this.lifecycleMgr) {
            stop();
            doDestroy();
        }
    }

    protected void doDestroy() {
        HashSet hashSet = new HashSet();
        Iterator<Map.Entry<String, ThreadUtils.QueueThread>> it = this.queueThreadMap.entrySet().iterator();
        while (it.hasNext()) {
            ThreadUtils.QueueThread value = it.next().getValue();
            if (value.isDefaultExecutor()) {
                ThreadPoolTaskExecutor taskExecutor = value.getTaskExecutor();
                String threadNamePrefix = taskExecutor.getThreadNamePrefix();
                if (!hashSet.contains(threadNamePrefix)) {
                    hashSet.add(threadNamePrefix);
                    taskExecutor.destroy();
                }
            }
        }
        if (!this.defaultTaskExecutor || this.taskExecutor == null) {
            return;
        }
        ThreadPoolTaskExecutor threadPoolTaskExecutor = this.taskExecutor;
        if (hashSet.contains(threadPoolTaskExecutor.getThreadNamePrefix())) {
            return;
        }
        threadPoolTaskExecutor.destroy();
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int i) {
        this.phase = i;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public void stop(Runnable runnable) {
        synchronized (this.lifecycleMgr) {
            stop();
            runnable.run();
        }
    }

    public void afterPropertiesSet() throws Exception {
        synchronized (this.lifecycleMgr) {
            if (RqueueMode.PRODUCER.equals(this.rqueueConfig.getMode())) {
                log.info("Producer only mode running.");
                return;
            }
            EndpointRegistry.delete();
            for (MappingInformation mappingInformation : this.rqueueMessageHandler.getHandlerMethods().keySet()) {
                Iterator<String> it = mappingInformation.getQueueNames().iterator();
                while (it.hasNext()) {
                    Iterator<QueueDetail> it2 = getQueueDetail(it.next(), mappingInformation).iterator();
                    while (it2.hasNext()) {
                        EndpointRegistry.register(it2.next());
                    }
                }
            }
            List<QueueDetail> activeQueueDetails = EndpointRegistry.getActiveQueueDetails();
            if (activeQueueDetails.isEmpty()) {
                return;
            }
            if (this.taskExecutor == null) {
                this.defaultTaskExecutor = true;
                this.taskExecutor = createDefaultTaskExecutor(activeQueueDetails);
            } else {
                initializeThreadMap(activeQueueDetails, this.taskExecutor, false, activeQueueDetails.size());
            }
            initializeRunningQueueState();
            this.lifecycleMgr.notifyAll();
        }
    }

    private void initializeThreadMap(List<QueueDetail> list, AsyncTaskExecutor asyncTaskExecutor, boolean z, int i) {
        Semaphore semaphore = new Semaphore(i);
        Iterator<QueueDetail> it = list.iterator();
        while (it.hasNext()) {
            this.queueThreadMap.put(it.next().getName(), new ThreadUtils.QueueThread(z, asyncTaskExecutor, semaphore, i));
        }
    }

    private void initializeRunningQueueState() {
        Iterator<String> it = EndpointRegistry.getActiveQueues().iterator();
        while (it.hasNext()) {
            this.queueRunningState.put(it.next(), false);
        }
    }

    private int getWorkersCount(int i) {
        return this.maxNumWorkers == null ? i * 2 : this.maxNumWorkers.intValue();
    }

    private AsyncTaskExecutor createTaskExecutor(int i, int i2) {
        String beanName = getBeanName();
        return ThreadUtils.createTaskExecutor(DEFAULT_THREAD_NAME_PREFIX, beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX, i, i2);
    }

    private AsyncTaskExecutor createNonConcurrencyBasedExecutor(List<QueueDetail> list, int i) {
        int workersCount = getWorkersCount(list.size());
        AsyncTaskExecutor createTaskExecutor = createTaskExecutor(list.size() + i, workersCount + i);
        initializeThreadMap(list, createTaskExecutor, true, workersCount);
        return createTaskExecutor;
    }

    private void createExecutor(QueueDetail queueDetail) {
        Concurrency concurrency = queueDetail.getConcurrency();
        this.queueThreadMap.put(queueDetail.getName(), new ThreadUtils.QueueThread(true, createTaskExecutor(queueDetail, concurrency.getMin(), concurrency.getMax()), new Semaphore(concurrency.getMax()), concurrency.getMax()));
    }

    public AsyncTaskExecutor createDefaultTaskExecutor(List<QueueDetail> list) {
        List<QueueDetail> list2 = (List) list.stream().filter(queueDetail -> {
            return !queueDetail.isSystemGenerated();
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        for (QueueDetail queueDetail2 : list2) {
            if (queueDetail2.getConcurrency().getMin() > 0) {
                createExecutor(queueDetail2);
            } else {
                arrayList.add(queueDetail2);
            }
        }
        return createNonConcurrencyBasedExecutor(arrayList, list2.size());
    }

    private AsyncTaskExecutor createTaskExecutor(QueueDetail queueDetail, int i, int i2) {
        String workerName = ThreadUtils.getWorkerName(queueDetail.getName());
        return ThreadUtils.createTaskExecutor(workerName, workerName + "-", i, i2);
    }

    private List<QueueDetail> getQueueDetail(String str, MappingInformation mappingInformation) {
        int numRetry = mappingInformation.getNumRetry();
        if (!mappingInformation.getDeadLetterQueueName().isEmpty() && numRetry == -1) {
            log.warn("Dead letter queue {} is set but retry is not set", mappingInformation.getDeadLetterQueueName());
            numRetry = 3;
        } else if (numRetry == -1) {
            numRetry = Integer.MAX_VALUE;
        }
        String priorityGroup = mappingInformation.getPriorityGroup();
        Map<String, Integer> priority = mappingInformation.getPriority();
        if (StringUtils.isEmpty(priorityGroup) && priority.size() == 1) {
            priorityGroup = Constants.DEFAULT_PRIORITY_GROUP;
        }
        QueueDetail build = QueueDetail.builder().name(str).queueName(this.rqueueConfig.getQueueName(str)).processingQueueName(this.rqueueConfig.getProcessingQueueName(str)).delayedQueueName(this.rqueueConfig.getDelayedQueueName(str)).processingQueueChannelName(this.rqueueConfig.getProcessingQueueChannelName(str)).delayedQueueChannelName(this.rqueueConfig.getDelayedQueueChannelName(str)).deadLetterQueueName(mappingInformation.getDeadLetterQueueName()).visibilityTimeout(mappingInformation.getVisibilityTimeout()).deadLetterConsumerEnabled(mappingInformation.isDeadLetterConsumerEnabled()).concurrency(mappingInformation.getConcurrency()).active(mappingInformation.isActive()).numRetry(numRetry).priority(priority).priorityGroup(priorityGroup).build();
        return build.getPriority().size() <= 1 ? Collections.singletonList(build) : build.expandQueueDetail(this.rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), this.rqueueConfig.getDefaultQueueWithQueueLevelPriority());
    }

    public void start() {
        log.info("Starting Rqueue Message container");
        synchronized (this.lifecycleMgr) {
            this.running = true;
            createPostProcessingHandler();
            doStart();
            this.applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", true));
            this.lifecycleMgr.notifyAll();
        }
    }

    private void createPostProcessingHandler() {
        this.postProcessingHandler = new PostProcessingHandler(this.rqueueConfig, this.rqueueWebConfig, this.applicationEventPublisher, this.rqueueMessageMetadataService, this.rqueueMessageTemplate, this.taskExecutionBackOff, new MessageProcessorHandler(this.manualDeletionMessageProcessor, this.deadLetterQueueMessageProcessor, this.discardMessageProcessor, this.postExecutionMessageProcessor), this.rqueueSystemConfigDao);
    }

    protected void doStart() {
        HashMap hashMap = new HashMap();
        for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
            if (queueDetail.getPriority().size() == 0) {
                startQueue(queueDetail.getName(), queueDetail);
            } else {
                List list = (List) hashMap.getOrDefault(queueDetail.getPriorityGroup(), new ArrayList());
                list.add(queueDetail);
                hashMap.put(queueDetail.getPriorityGroup(), list);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            startGroup((String) entry.getKey(), (List) entry.getValue());
        }
    }

    private Map<String, ThreadUtils.QueueThread> getQueueThreadMap(String str, List<QueueDetail> list) {
        ThreadUtils.QueueThread queueThread = this.queueThreadMap.get(str);
        return queueThread != null ? (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, queueDetail -> {
            return queueThread;
        })) : (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, queueDetail2 -> {
            return this.queueThreadMap.get(queueDetail2.getName());
        }));
    }

    protected void startGroup(String str, List<QueueDetail> list) {
        if (getPriorityMode() == null) {
            throw new IllegalStateException("Priority mode is not set");
        }
        Iterator<QueueDetail> it = list.iterator();
        while (it.hasNext()) {
            this.queueRunningState.put(it.next().getName(), true);
        }
        Map<String, ThreadUtils.QueueThread> queueThreadMap = getQueueThreadMap(str, list);
        this.scheduledFutureByQueue.put(str, getPriorityMode() == PriorityMode.STRICT ? this.taskExecutor.submit(new StrictPriorityPoller(StringUtils.groupName(str), this, list, queueThreadMap, this.postProcessingHandler, this.rqueueConfig)) : this.taskExecutor.submit(new WeightedPriorityPoller(StringUtils.groupName(str), this, list, queueThreadMap, this.postProcessingHandler, this.rqueueConfig)));
    }

    protected void startQueue(String str, QueueDetail queueDetail) {
        if (Boolean.TRUE.equals(this.queueRunningState.get(str))) {
            return;
        }
        this.queueRunningState.put(str, true);
        this.scheduledFutureByQueue.put(str, getTaskExecutor().submit(new DefaultRqueuePoller(this.queueThreadMap.get(str), queueDetail, this, this.postProcessingHandler, this.rqueueConfig)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isQueueActive(String str) {
        return this.queueRunningState.getOrDefault(str, false).booleanValue();
    }

    public AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void setTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
        this.taskExecutor = asyncTaskExecutor;
    }

    public void stop() {
        log.info("Stopping Rqueue Message container");
        synchronized (this.lifecycleMgr) {
            this.running = false;
            this.applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", false));
            doStop();
            this.lifecycleMgr.notifyAll();
        }
    }

    protected void doStop() {
        for (Map.Entry<String, Boolean> entry : this.queueRunningState.entrySet()) {
            if (Boolean.TRUE.equals(entry.getValue())) {
                stopQueue(entry.getKey());
            }
        }
        waitForRunningQueuesToStop();
    }

    private void waitForRunningQueuesToStop() {
        Iterator<Map.Entry<String, Boolean>> it = this.queueRunningState.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            ThreadUtils.waitForTermination(log, this.scheduledFutureByQueue.get(key), getMaxWorkerWaitTime(), "An exception occurred while stopping queue '{}'", key);
        }
        if (ThreadUtils.waitForWorkerTermination(this.queueThreadMap.values(), getMaxWorkerWaitTime())) {
            return;
        }
        log.error("Some workers are not stopped within time");
    }

    private void stopQueue(String str) {
        Assert.isTrue(this.queueRunningState.containsKey(str), "Queue with name '" + str + "' does not exist");
        this.queueRunningState.put(str, false);
    }

    public boolean isRunning() {
        boolean z;
        synchronized (this.lifecycleMgr) {
            z = this.running;
        }
        return z;
    }

    public long getPollingInterval() {
        return this.pollingInterval;
    }

    public void setPollingInterval(long j) {
        this.pollingInterval = j;
    }

    public MessageProcessor getDiscardMessageProcessor() {
        return this.discardMessageProcessor;
    }

    public void setDiscardMessageProcessor(MessageProcessor messageProcessor) {
        Assert.notNull(messageProcessor, "discardMessageProcessor cannot be null");
        this.discardMessageProcessor = messageProcessor;
    }

    public MessageProcessor getDeadLetterQueueMessageProcessor() {
        return this.deadLetterQueueMessageProcessor;
    }

    public void setDeadLetterQueueMessageProcessor(MessageProcessor messageProcessor) {
        Assert.notNull(messageProcessor, "deadLetterQueueMessageProcessor cannot be null");
        this.deadLetterQueueMessageProcessor = messageProcessor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RqueueMessageMetadataService getRqueueMessageMetadataService() {
        return this.rqueueMessageMetadataService;
    }

    public MessageProcessor getManualDeletionMessageProcessor() {
        return this.manualDeletionMessageProcessor;
    }

    public void setManualDeletionMessageProcessor(MessageProcessor messageProcessor) {
        Assert.notNull(messageProcessor, "manualDeletionMessageProcessor cannot be null");
        this.manualDeletionMessageProcessor = messageProcessor;
    }

    public MessageProcessor getPostExecutionMessageProcessor() {
        return this.postExecutionMessageProcessor;
    }

    public void setPostExecutionMessageProcessor(MessageProcessor messageProcessor) {
        Assert.notNull(messageProcessor, "postExecutionMessageProcessor cannot be null");
        this.postExecutionMessageProcessor = messageProcessor;
    }

    public MessageProcessor getPreExecutionMessageProcessor() {
        return this.preExecutionMessageProcessor;
    }

    public void setPreExecutionMessageProcessor(MessageProcessor messageProcessor) {
        Assert.notNull(messageProcessor, "preExecutionMessageProcessor cannot be null");
        this.preExecutionMessageProcessor = messageProcessor;
    }

    public TaskExecutionBackOff getTaskExecutionBackOff() {
        return this.taskExecutionBackOff;
    }

    public void setTaskExecutionBackOff(TaskExecutionBackOff taskExecutionBackOff) {
        Assert.notNull(taskExecutionBackOff, "taskExecutionBackOff cannot be null");
        this.taskExecutionBackOff = taskExecutionBackOff;
    }

    public PriorityMode getPriorityMode() {
        return this.priorityMode;
    }

    public void setPriorityMode(PriorityMode priorityMode) {
        this.priorityMode = priorityMode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RqueueMetricsCounter getRqueueMetricsCounter() {
        return this.rqueueMetricsCounter;
    }
}
