/*
 * Decompiled with CFR 0.152.
 */
package cn.sliew.carp.framework.queue.kekio.redis;

import cn.sliew.carp.framework.queue.kekio.AbstractQueue;
import cn.sliew.carp.framework.queue.kekio.MessageHandler;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.carp.framework.queue.kekio.redis.SerDerUtil;
import cn.sliew.milky.common.function.CheckedConsumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.hash.Hashing;
import io.micrometer.core.instrument.MeterRegistry;
import java.beans.ConstructorProperties;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.commands.JedisCommands;

public abstract class AbstractRedisQueue<CLIENT extends JedisCommands>
extends AbstractQueue
implements MonitorableQueue {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractRedisQueue.class);
    protected final Integer lockTtlSeconds;
    protected final ObjectMapper mapper;
    private final ObjectMapper hashObjectMapper;
    protected static final String READ_MESSAGE_SRC = "local java_scientific = function(x)\n  return string.format(\"%.12E\", x):gsub(\"\\+\", \"\")\nend\n\n-- get the message, move the fingerprint to the unacked queue and return\nlocal message = redis.call(\"HGET\", messagesKey, fingerprint)\n\n-- check for an ack timeout override on the message\nlocal unackScore = unackDefaultScore\nif type(message) == \"string\" and message ~= nil then\n  local ackTimeoutOverride = tonumber(cjson.decode(message)[\"ackTimeoutMs\"])\n  if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then\n    unackScore = unackBaseScore + ackTimeoutOverride\n  end\nend\n\nunackScore = java_scientific(unackScore)\n\nredis.call(\"ZREM\", queueKey, fingerprint)\nredis.call(\"ZADD\", unackKey, unackScore, fingerprint)\n";
    protected static final String READ_MESSAGE_WITH_LOCK_SRC = "local queueKey = KEYS[1]\nlocal unackKey = KEYS[2]\nlocal lockKey = KEYS[3]\nlocal messagesKey = KEYS[4]\nlocal maxScore = ARGV[1]\nlocal peekFingerprintCount = ARGV[2]\nlocal lockTtlSeconds = ARGV[3]\nlocal unackDefaultScore = ARGV[4]\nlocal unackBaseScore = ARGV[5]\n\nlocal not_empty = function(x)\n  return (type(x) == \"table\") and (not x.err) and (#x ~= 0)\nend\n\nlocal acquire_lock = function(fingerprints, locksKey, lockTtlSeconds)\n  if not_empty(fingerprints) then\n    local i=1\n    while (i <= #fingerprints) do\n      redis.call(\"ECHO\", \"attempting lock on \" .. fingerprints[i])\n      if redis.call(\"SET\", locksKey .. \":\" .. fingerprints[i], \"\\uD83D\\uDD12\", \"EX\", lockTtlSeconds, \"NX\") then\n        redis.call(\"ECHO\", \"acquired lock on \" .. fingerprints[i])\n        return fingerprints[i], fingerprints[i+1]\n      end\n      i=i+2\n    end\n  end\n  return nil, nil\nend\n\n-- acquire a lock on a fingerprint\nlocal fingerprints = redis.call(\"ZRANGEBYSCORE\", queueKey, 0.0, maxScore, \"WITHSCORES\", \"LIMIT\", 0, peekFingerprintCount)\nlocal fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds)\n\n-- no lock could be acquired\nif fingerprint == nil then\n  if #fingerprints == 0 then\n    return \"NoReadyMessages\"\n  end\n  return \"AcquireLockFailed\"\nend\n\nlocal java_scientific = function(x)\n  return string.format(\"%.12E\", x):gsub(\"\\+\", \"\")\nend\n\n-- get the message, move the fingerprint to the unacked queue and return\nlocal message = redis.call(\"HGET\", messagesKey, fingerprint)\n\n-- check for an ack timeout override on the message\nlocal unackScore = unackDefaultScore\nif type(message) == \"string\" and message ~= nil then\n  local ackTimeoutOverride = tonumber(cjson.decode(message)[\"ackTimeoutMs\"])\n  if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then\n    unackScore = unackBaseScore + ackTimeoutOverride\n  end\nend\n\nunackScore = java_scientific(unackScore)\n\nredis.call(\"ZREM\", queueKey, fingerprint)\nredis.call(\"ZADD\", unackKey, unackScore, fingerprint)\nreturn {fingerprint, fingerprintScore, message}\n";

    protected abstract String getQueueKey();

    protected abstract String getUnackedKey();

    protected abstract String getMessagesKey();

    protected abstract String getLocksKey();

    protected abstract String getAttemptsKey();

    public abstract void cacheScript();

    protected abstract String getReadMessageWithLockScriptSha();

    protected abstract void setReadMessageWithLockScriptSha(String var1);

    protected abstract <T> T withJedis(Function<CLIENT, T> var1);

    protected void withJedis(Consumer<CLIENT> consumer) {
        this.withJedis((CLIENT commands) -> {
            consumer.accept(commands);
            return null;
        });
    }

    protected AbstractRedisQueue(ObjectMapper mapper, String name, QueueExecutor queueExecutor, Collection<MessageHandler> handlers, List<Queue.DeadMessageCallback> deadMessageHandlers, EventPublisher publisher, MeterRegistry meterRegistry, Boolean fillExecutorEachCycle, Duration requeueDelay, Duration requeueMaxJitter, Boolean canPollMany, TemporalAmount ackTimeout, Integer lockTtlSeconds) {
        super(name, queueExecutor, handlers, deadMessageHandlers, publisher, meterRegistry, fillExecutorEachCycle, requeueDelay, requeueMaxJitter, canPollMany, ackTimeout);
        this.lockTtlSeconds = Objects.nonNull(lockTtlSeconds) ? lockTtlSeconds : 10;
        this.mapper = mapper;
        this.hashObjectMapper = mapper.copy();
        this.hashObjectMapper.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
    }

    @Override
    public TemporalAmount getAckTimeout() {
        return this.ackTimeout;
    }

    @Override
    public List<Queue.DeadMessageCallback> getDeadMessageHandlers() {
        return this.deadMessageHandlers;
    }

    @Override
    public Boolean canPollMany() {
        return this.canPollMany;
    }

    @Override
    public EventPublisher getPublisher() {
        return this.publisher;
    }

    protected void handleDeadMessage(Message message) {
        if (CollectionUtils.isNotEmpty((Collection)this.deadMessageHandlers)) {
            this.deadMessageHandlers.forEach(callback -> callback.accept(this, message));
        } else {
            try {
                log.error("Handle dead message error, empty deadMessageHandlers. message: {}", (Object)SerDerUtil.serializeAsJsonString(this.mapper, message));
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected double score() {
        return this.score(Duration.ZERO);
    }

    protected double score(TemporalAmount delay) {
        delay = Objects.nonNull(delay) ? delay : Duration.ZERO;
        return Instant.now().plus(delay).toEpochMilli();
    }

    protected abstract <E extends Throwable> List<Object> multi(CheckedConsumer<Transaction, E> var1);

    protected int hgetInt(String key, String field) {
        return this.hgetInt(key, field, 0);
    }

    protected int hgetInt(String key, String field, int defaultValue) {
        return this.withJedis((CLIENT commands) -> {
            String value = commands.hget(key, field);
            return value != null ? Integer.parseInt(value) : defaultValue;
        });
    }

    protected boolean zismember(JedisCommands commands, String key, String member) {
        return commands.zrank(key, member) != null;
    }

    protected boolean anyZismember(JedisCommands commands, String key, Set<String> members) {
        return members.stream().anyMatch(member -> this.zismember(commands, key, (String)member));
    }

    protected String firstFingerprint(String key, Fingerprint fingerprint) {
        return this.withJedis((CLIENT commands) -> fingerprint.getAll().stream().filter(fp -> this.zismember((JedisCommands)commands, key, (String)fp)).findFirst().orElse(null));
    }

    @Deprecated
    protected String hashV1(Message message) {
        try {
            return Hashing.murmur3_128().hashString((CharSequence)SerDerUtil.serializeAsJsonString(this.mapper, message), Charset.defaultCharset()).toString();
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    protected String hashV2(Message message) {
        try {
            HashMap map = (HashMap)this.hashObjectMapper.convertValue((Object)message, HashMap.class);
            map.remove("attributes");
            return Hashing.murmur3_128().hashString((CharSequence)("v2:" + SerDerUtil.serializeAsJsonString(this.hashObjectMapper, map)), StandardCharsets.UTF_8).toString();
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    protected Fingerprint fingerprint(Message message) {
        String hashV2 = this.hashV2(message);
        HashSet<String> all = new HashSet<String>();
        all.add(hashV2);
        all.add(this.hashV1(message));
        return new Fingerprint(hashV2, all);
    }

    protected static class Fingerprint {
        private final String latest;
        private final Set<String> all;

        @Generated
        public String getLatest() {
            return this.latest;
        }

        @Generated
        public Set<String> getAll() {
            return this.all;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Fingerprint)) {
                return false;
            }
            Fingerprint other = (Fingerprint)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$latest = this.getLatest();
            String other$latest = other.getLatest();
            if (this$latest == null ? other$latest != null : !this$latest.equals(other$latest)) {
                return false;
            }
            Set<String> this$all = this.getAll();
            Set<String> other$all = other.getAll();
            return !(this$all == null ? other$all != null : !((Object)this$all).equals(other$all));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof Fingerprint;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $latest = this.getLatest();
            result = result * 59 + ($latest == null ? 43 : $latest.hashCode());
            Set<String> $all = this.getAll();
            result = result * 59 + ($all == null ? 43 : ((Object)$all).hashCode());
            return result;
        }

        @ConstructorProperties(value={"latest", "all"})
        @Generated
        public Fingerprint(String latest, Set<String> all) {
            this.latest = latest;
            this.all = all;
        }
    }
}

