/*
 * Decompiled with CFR 0.152.
 */
package cn.jdevelops.delay.redis;

import cn.jdevelops.delay.core.entity.DelayQueueMessage;
import cn.jdevelops.delay.core.factory.DelayFactory;
import cn.jdevelops.delay.core.service.DelayService;
import com.alibaba.fastjson2.JSON;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
public class RedisDelayService
implements DelayService<DelayQueueMessage> {
    private static final Logger logger = LoggerFactory.getLogger(RedisDelayService.class);
    @Resource
    private RedisTemplate<String, String> redisTemplate;
    @Resource
    private DelayFactory<DelayQueueMessage> delayRunFactory;
    private static final String DELAY_QUEUE = "delay:redis_delay_queue";
    @Resource(name="delayRedisScript")
    private DefaultRedisScript<List<String>> delayRedisScript;
    private static final String NAME = "RedisDelayMessageTask-thread-";
    private final AtomicInteger seq = new AtomicInteger(1);
    private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, NAME + this.seq.getAndIncrement()));

    public void produce(DelayQueueMessage delayMessage) {
        String value = JSON.toJSONString((Object)delayMessage);
        this.redisTemplate.opsForZSet().add((Object)DELAY_QUEUE, (Object)value, (double)delayMessage.getDelayTime().longValue());
    }

    public void produce(List<DelayQueueMessage> delayMessage) {
        delayMessage.forEach(this::produce);
    }

    public void consumeDelay() {
        long initialDelay = Math.round(Math.random() * 10.0 + 10.0);
        long periodRound = Math.round(Math.random() * 10.0);
        long period = periodRound == 0L ? 1L : periodRound;
        logger.info("\u5f00\u59cb\u6d88\u8d39redis\u5ef6\u65f6\u961f\u5217\u6570\u636e...");
        this.pool.scheduleAtFixedRate(() -> {
            try {
                Set<String> set = this.runLuaScript(DELAY_QUEUE);
                if (!CollectionUtils.isEmpty(set)) {
                    set.forEach(s -> {
                        DelayQueueMessage redisDelayMessage = (DelayQueueMessage)JSON.to(DelayQueueMessage.class, (Object)JSON.parseObject((String)s));
                        this.delayRunFactory.delayExecute(redisDelayMessage);
                    });
                }
            }
            catch (Throwable e) {
                logger.error("RemindMessageTask error..", e);
            }
        }, initialDelay, period, TimeUnit.SECONDS);
    }

    public Set<String> runLuaScript(String key) {
        double min = 0.0;
        double max = System.currentTimeMillis();
        List execute = (List)this.redisTemplate.execute(this.delayRedisScript, Collections.singletonList(key), new Object[]{min, max});
        return new HashSet<String>(execute);
    }
}

