package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.Iterator;
import java.util.List;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.CollectionUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessagePoller.class */
public abstract class RqueueMessagePoller extends MessageContainerBase {
    private final PostProcessingHandler postProcessingHandler;
    final List<Middleware> middlewares;
    final long pollingInterval;
    final long backoffTime;
    private final RqueueBeanProvider rqueueBeanProvider;
    List<String> queues;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueMessagePoller$DeactivateType.class */
    public enum DeactivateType {
        POLL_FAILED,
        NO_MESSAGE,
        SEMAPHORE_EXCEPTION,
        SEMAPHORE_UNAVAILABLE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RqueueMessagePoller(String str, RqueueBeanProvider rqueueBeanProvider, RqueueMessageListenerContainer.QueueStateMgr queueStateMgr, List<Middleware> list, long j, long j2, PostProcessingHandler postProcessingHandler) {
        super(LoggerFactory.getLogger(RqueueMessagePoller.class), str, queueStateMgr);
        this.postProcessingHandler = postProcessingHandler;
        this.middlewares = list;
        this.rqueueBeanProvider = rqueueBeanProvider;
        this.pollingInterval = j;
        this.backoffTime = j2;
    }

    private List<RqueueMessage> getMessages(QueueDetail queueDetail, int i) {
        return this.rqueueBeanProvider.getRqueueMessageTemplate().popN(queueDetail.getQueueName(), queueDetail.getProcessingQueueName(), queueDetail.getProcessingQueueChannelName(), queueDetail.getVisibilityTimeout(), i);
    }

    private void execute(QueueThreadPool queueThreadPool, QueueDetail queueDetail, RqueueMessage rqueueMessage) {
        try {
            queueThreadPool.execute(new RqueueExecutor(this.rqueueBeanProvider, this.queueStateMgr, this.middlewares, this.postProcessingHandler, rqueueMessage, queueDetail, queueThreadPool));
        } catch (Exception e) {
            if (e instanceof TaskRejectedException) {
                queueThreadPool.taskRejected();
            }
            log(Level.WARN, "Execution failed Msg: {}", e, rqueueMessage);
            release(this.postProcessingHandler, queueThreadPool, queueDetail, rqueueMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldExit() {
        Iterator<String> it = this.queues.iterator();
        while (it.hasNext()) {
            if (isQueueActive(it.next())) {
                return false;
            }
        }
        log(Level.INFO, "Shutting down all queues {} are inactive", null, this.queues);
        return true;
    }

    protected int getBatchSize(QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        int max = Math.max(Math.min(queueDetail.getBatchSize(), queueThreadPool.availableThreads()), 1);
        log(Level.DEBUG, "Batch size {}", null, Integer.valueOf(max));
        return max;
    }

    private void sendMessagesToExecutor(QueueDetail queueDetail, QueueThreadPool queueThreadPool, List<RqueueMessage> list) {
        Iterator<RqueueMessage> it = list.iterator();
        while (it.hasNext()) {
            execute(queueThreadPool, queueDetail, it.next());
        }
    }

    private void pollAndExecute(int i, String str, QueueDetail queueDetail, QueueThreadPool queueThreadPool, int i2) {
        if (isQueueActive(str)) {
            try {
                List<RqueueMessage> messages = getMessages(queueDetail, i2);
                log(Level.DEBUG, "Queue: {} Fetched Msgs {}", null, str, messages);
                int size = CollectionUtils.isEmpty(messages) ? 0 : messages.size();
                if (size == 0) {
                    deactivate(i, str, DeactivateType.NO_MESSAGE);
                }
                queueThreadPool.release(i2 - size);
                if (size > 0) {
                    sendMessagesToExecutor(queueDetail, queueThreadPool, messages);
                }
            } catch (Exception e) {
                queueThreadPool.release(i2);
                log(Level.WARN, "Listener failed for the queue {}", e, str);
                deactivate(i, str, DeactivateType.POLL_FAILED);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void poll(int i, String str, QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        log(Level.DEBUG, "Polling queue {}", null, str);
        int batchSize = getBatchSize(queueDetail, queueThreadPool);
        try {
            if (queueThreadPool.acquire(batchSize, getSemaphoreWaitTime())) {
                pollAndExecute(i, str, queueDetail, queueThreadPool, batchSize);
            } else {
                deactivate(i, str, DeactivateType.SEMAPHORE_UNAVAILABLE);
            }
        } catch (Exception e) {
            log(Level.WARN, "Exception {}", e, e.getMessage());
            deactivate(i, str, DeactivateType.SEMAPHORE_EXCEPTION);
        }
    }

    abstract long getSemaphoreWaitTime();

    abstract void deactivate(int i, String str, DeactivateType deactivateType);
}
