package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.slf4j.Logger;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/sonus21/rqueue/core/RedisScheduleTriggerHandler.class */
public class RedisScheduleTriggerHandler {
    private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
    private final RqueueSchedulerConfig rqueueSchedulerConfig;
    private final Logger logger;
    private final Function<String, Future<?>> scheduler;
    private final Function<String, String> channelNameProducer;
    private final List<String> queueNames;

    @VisibleForTesting
    Map<String, Long> queueNameToLastRunTime;

    @VisibleForTesting
    Map<String, Future<?>> queueNameToFuture;

    @VisibleForTesting
    Map<String, String> channelNameToQueueName;

    @VisibleForTesting
    MessageListener messageListener;

    /* loaded from: input_file:com/github/sonus21/rqueue/core/RedisScheduleTriggerHandler$MessageSchedulerListener.class */
    private class MessageSchedulerListener implements MessageListener {
        private MessageSchedulerListener() {
        }

        private void schedule(String str, long j) {
            Future<?> future = RedisScheduleTriggerHandler.this.queueNameToFuture.get(str);
            if (future == null || future.isCancelled() || future.isDone()) {
                RedisScheduleTriggerHandler.this.queueNameToLastRunTime.put(str, Long.valueOf(j));
                RedisScheduleTriggerHandler.this.queueNameToFuture.put(str, RedisScheduleTriggerHandler.this.scheduler.apply(str));
            }
        }

        private void handleMessage(String str, long j) {
            long currentTimeMillis = System.currentTimeMillis();
            if (j > currentTimeMillis) {
                RedisScheduleTriggerHandler.this.logger.warn("Received message body is not correct queue: {}, time: {}", str, Long.valueOf(j));
            } else {
                if (currentTimeMillis - RedisScheduleTriggerHandler.this.queueNameToLastRunTime.get(str).longValue() < RedisScheduleTriggerHandler.this.getMinDelay()) {
                    return;
                }
                schedule(str, currentTimeMillis);
            }
        }

        public void onMessage(Message message, byte[] bArr) {
            if (message.getBody().length == 0 || message.getChannel().length == 0) {
                return;
            }
            String str = new String(message.getBody());
            String str2 = new String(message.getChannel());
            RedisScheduleTriggerHandler.this.logger.trace("Body: {} Channel: {}", str, str2);
            try {
                long parseLong = Long.parseLong(str);
                String str3 = RedisScheduleTriggerHandler.this.channelNameToQueueName.get(str2);
                if (str3 == null) {
                    RedisScheduleTriggerHandler.this.logger.warn("Unknown channel name {}", str2);
                } else {
                    handleMessage(str3, parseLong);
                }
            } catch (Exception e) {
                RedisScheduleTriggerHandler.this.logger.error("Error occurred on a channel {}, body: {}", new Object[]{str2, str, e});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedisScheduleTriggerHandler(Logger logger, RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory, RqueueSchedulerConfig rqueueSchedulerConfig, List<String> list, Function<String, Future<?>> function, Function<String, String> function2) {
        this.queueNames = list;
        this.rqueueSchedulerConfig = rqueueSchedulerConfig;
        this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
        this.logger = logger;
        this.scheduler = function;
        this.channelNameProducer = function2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() {
        this.messageListener = new MessageSchedulerListener();
        this.channelNameToQueueName = new HashMap(this.queueNames.size());
        this.queueNameToFuture = new ConcurrentHashMap(this.queueNames.size());
        this.queueNameToLastRunTime = new ConcurrentHashMap(this.queueNames.size());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        Iterator<String> it = this.queueNames.iterator();
        while (it.hasNext()) {
            stopQueue(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startQueue(String str) {
        this.queueNameToLastRunTime.put(str, 0L);
        subscribeToRedisTopic(str);
    }

    void stopQueue(String str) {
        ThreadUtils.waitForTermination(this.logger, this.queueNameToFuture.get(str), this.rqueueSchedulerConfig.getTerminationWaitTime(), "An exception occurred while stopping scheduler queue '{}'", str);
        this.queueNameToLastRunTime.put(str, 0L);
        this.queueNameToFuture.remove(str);
        unsubscribeFromRedis(str);
    }

    private void unsubscribeFromRedis(String str) {
        String apply = this.channelNameProducer.apply(str);
        this.logger.debug("Queue {} unsubscribe from channel {}", str, apply);
        this.rqueueRedisListenerContainerFactory.removeMessageListener(this.messageListener, new ChannelTopic(apply));
        this.channelNameToQueueName.put(apply, str);
    }

    private void subscribeToRedisTopic(String str) {
        String apply = this.channelNameProducer.apply(str);
        this.channelNameToQueueName.put(apply, str);
        this.logger.debug("Queue {} subscribe to channel {}", str, apply);
        this.rqueueRedisListenerContainerFactory.addMessageListener(this.messageListener, new ChannelTopic(apply));
    }

    protected long getMinDelay() {
        return this.rqueueSchedulerConfig.minMessageMoveDelay();
    }
}
