/*
 * Decompiled with CFR 0.152.
 */
package cn.sliew.carp.framework.queue.kekio.redis;

import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.redis.RedisQueue;
import cn.sliew.milky.common.function.CheckedConsumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.util.Pool;

public class JedisQueue
extends RedisQueue<Jedis> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JedisQueue.class);
    private final Pool<Jedis> pool;

    public JedisQueue(Pool<Jedis> pool, String queueName, ObjectMapper mapper, QueueExecutor queueExecutor, List<Queue.DeadMessageCallback> deadMessageHandlers, EventPublisher publisher, MeterRegistry meterRegistry, Boolean fillExecutorEachCycle, Duration requeueDelay, Duration requeueMaxJitter, Boolean canPollMany, TemporalAmount ackTimeout, Integer lockTtlSeconds) {
        super(queueName, mapper, queueExecutor, deadMessageHandlers, publisher, meterRegistry, fillExecutorEachCycle, requeueDelay, requeueMaxJitter, canPollMany, ackTimeout, lockTtlSeconds);
        this.pool = pool;
        this.cacheScript();
        log.info("Configured {} queue: {}", (Object)this.getClass().getName(), (Object)queueName);
    }

    @Override
    public void cacheScript() {
        try (Jedis redis = (Jedis)this.pool.getResource();){
            this.setReadMessageWithLockScriptSha(redis.scriptLoad("local queueKey = KEYS[1]\nlocal unackKey = KEYS[2]\nlocal lockKey = KEYS[3]\nlocal messagesKey = KEYS[4]\nlocal maxScore = ARGV[1]\nlocal peekFingerprintCount = ARGV[2]\nlocal lockTtlSeconds = ARGV[3]\nlocal unackDefaultScore = ARGV[4]\nlocal unackBaseScore = ARGV[5]\n\nlocal not_empty = function(x)\n  return (type(x) == \"table\") and (not x.err) and (#x ~= 0)\nend\n\nlocal acquire_lock = function(fingerprints, locksKey, lockTtlSeconds)\n  if not_empty(fingerprints) then\n    local i=1\n    while (i <= #fingerprints) do\n      redis.call(\"ECHO\", \"attempting lock on \" .. fingerprints[i])\n      if redis.call(\"SET\", locksKey .. \":\" .. fingerprints[i], \"\\uD83D\\uDD12\", \"EX\", lockTtlSeconds, \"NX\") then\n        redis.call(\"ECHO\", \"acquired lock on \" .. fingerprints[i])\n        return fingerprints[i], fingerprints[i+1]\n      end\n      i=i+2\n    end\n  end\n  return nil, nil\nend\n\n-- acquire a lock on a fingerprint\nlocal fingerprints = redis.call(\"ZRANGEBYSCORE\", queueKey, 0.0, maxScore, \"WITHSCORES\", \"LIMIT\", 0, peekFingerprintCount)\nlocal fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds)\n\n-- no lock could be acquired\nif fingerprint == nil then\n  if #fingerprints == 0 then\n    return \"NoReadyMessages\"\n  end\n  return \"AcquireLockFailed\"\nend\n\nlocal java_scientific = function(x)\n  return string.format(\"%.12E\", x):gsub(\"\\+\", \"\")\nend\n\n-- get the message, move the fingerprint to the unacked queue and return\nlocal message = redis.call(\"HGET\", messagesKey, fingerprint)\n\n-- check for an ack timeout override on the message\nlocal unackScore = unackDefaultScore\nif type(message) == \"string\" and message ~= nil then\n  local ackTimeoutOverride = tonumber(cjson.decode(message)[\"ackTimeoutMs\"])\n  if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then\n    unackScore = unackBaseScore + ackTimeoutOverride\n  end\nend\n\nunackScore = java_scientific(unackScore)\n\nredis.call(\"ZREM\", queueKey, fingerprint)\nredis.call(\"ZADD\", unackKey, unackScore, fingerprint)\nreturn {fingerprint, fingerprintScore, message}\n"));
        }
    }

    @Override
    protected <T> T withJedis(Function<Jedis, T> function) {
        try (Jedis jedis = (Jedis)this.pool.getResource();){
            T t = function.apply(jedis);
            return t;
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    @Override
    protected <E extends Throwable> List<Object> multi(CheckedConsumer<Transaction, E> block) {
        try (Jedis jedis = (Jedis)this.pool.getResource();){
            List list;
            block14: {
                Transaction tx = jedis.multi();
                try {
                    block.accept((Object)tx);
                    list = tx.exec();
                    if (tx == null) break block14;
                }
                catch (Throwable throwable) {
                    if (tx != null) {
                        try {
                            tx.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                tx.close();
            }
            return list;
        }
        catch (Throwable e) {
            throw new RuntimeException("Failed to execute Redis transaction", e);
        }
    }
}

