package com.github.davidmarquis.redisq.consumer;

import com.github.davidmarquis.redisq.Message;
import com.github.davidmarquis.redisq.MessageQueue;
import com.github.davidmarquis.redisq.consumer.retry.MessageRetryStrategy;
import com.github.davidmarquis.redisq.consumer.retry.NoRetryStrategy;
import com.github.davidmarquis.redisq.consumer.retry.RetryableMessageException;
import com.github.davidmarquis.redisq.persistence.RedisOps;
import com.github.davidmarquis.redisq.utils.GenericsUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.RedisConnectionFailureException;

/* loaded from: input_file:com/github/davidmarquis/redisq/consumer/MessageConsumer.class */
public class MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
    private MessageQueue queue;
    private String consumerId;
    private MessageListener<T> messageListener;

    @Autowired
    private RedisOps redisOps;
    private Class<T> payloadType;
    private ThreadingStrategy threadingStrategy = new SingleThreadingStrategy();
    private MessageRetryStrategy<T> retryStrategy = new NoRetryStrategy();
    private ConnectionFailureHandler connectionFailureHandler = new DefaultConnectionFailureHandler(log);
    private boolean autoStartConsumers = true;

    @PostConstruct
    public void initialize() {
        this.consumerId = this.consumerId == null ? this.queue.getDefaultConsumerId() : this.consumerId;
        if (StringUtils.isEmpty(this.consumerId)) {
            throw new IllegalStateException("Consumer ID is not set but is mandatory.");
        }
        String queueName = this.queue.getQueueName();
        this.redisOps.ensureConsumerRegistered(queueName, this.consumerId);
        log.debug(String.format("Registered as consumer ID [%s] on queue [%s]", this.consumerId, queueName));
        this.payloadType = extractMessagePayloadTypeFromListener();
        log.debug(String.format("Handling payloads from messages in queue [%s] as objects of class [%s]", queueName, this.payloadType));
        if (this.autoStartConsumers) {
            startConsumer();
        }
    }

    public void startConsumer() {
        startDequeue();
    }

    protected void startDequeue() {
        this.threadingStrategy.start(this.queue.getQueueName(), new Runnable() { // from class: com.github.davidmarquis.redisq.consumer.MessageConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    MessageConsumer.this.processNextMessage();
                } catch (RedisConnectionFailureException e) {
                    MessageConsumer.this.connectionFailureHandler.serverConnectionFailed(e);
                }
            }
        });
    }

    @PreDestroy
    public void stopConsumer() {
        this.threadingStrategy.stop();
    }

    protected void processNextMessage() {
        this.queue.dequeue(this.consumerId, new MessageCallback() { // from class: com.github.davidmarquis.redisq.consumer.MessageConsumer.2
            @Override // com.github.davidmarquis.redisq.consumer.MessageCallback
            public void handle(String str) {
                Message<T> loadMessageById = MessageConsumer.this.redisOps.loadMessageById(MessageConsumer.this.queue.getQueueName(), str, MessageConsumer.this.payloadType);
                try {
                    MessageConsumer.this.handleMessage(loadMessageById);
                } catch (RetryableMessageException e) {
                    MessageConsumer.this.retryStrategy.retry(loadMessageById, MessageConsumer.this.queue, MessageConsumer.this.consumerId);
                } catch (Throwable th) {
                    MessageConsumer.this.handleExceptionWhileProcessingMessage(loadMessageById, th);
                }
            }
        });
    }

    protected void handleMessage(Message<T> message) throws RetryableMessageException {
        this.messageListener.onMessage(message);
    }

    protected void handleExceptionWhileProcessingMessage(Message<T> message, Throwable th) {
        log.error(String.format("Exception while handling message with ID [%s]", message.getId()), th);
    }

    private Class<T> extractMessagePayloadTypeFromListener() {
        return (Class<T>) GenericsUtils.getGenericTypeOfInterface(this.messageListener.getClass(), MessageListener.class);
    }

    public void setConsumerId(String str) {
        this.consumerId = str;
    }

    public void setRedisOps(RedisOps redisOps) {
        this.redisOps = redisOps;
    }

    public void setMessageListener(MessageListener<T> messageListener) {
        this.messageListener = messageListener;
    }

    public void setQueue(MessageQueue messageQueue) {
        this.queue = messageQueue;
    }

    public MessageListener<T> getMessageListener() {
        return this.messageListener;
    }

    public void setThreadingStrategy(ThreadingStrategy threadingStrategy) {
        this.threadingStrategy = threadingStrategy;
    }

    public void setAutoStartConsumers(boolean z) {
        this.autoStartConsumers = z;
    }

    public void setRetryStrategy(MessageRetryStrategy<T> messageRetryStrategy) {
        this.retryStrategy = messageRetryStrategy;
    }
}
