/*
 * Decompiled with CFR 0.152.
 */
package cn.sliew.carp.framework.queue.kekio.redis;

import cn.sliew.carp.framework.common.util.KeyUtil;
import cn.sliew.carp.framework.queue.kekio.MessageHandler;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.AttemptsAttribute;
import cn.sliew.carp.framework.queue.kekio.message.MaxAttemptsAttribute;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.carp.framework.queue.kekio.metrics.QueueEvent;
import cn.sliew.carp.framework.queue.kekio.redis.AbstractRedisQueue;
import cn.sliew.carp.framework.queue.kekio.redis.SerDerUtil;
import cn.sliew.milky.common.util.JacksonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.resps.ScanResult;

public abstract class RedisQueue<CLIENT extends JedisCommands>
extends AbstractRedisQueue<CLIENT> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedisQueue.class);
    private final String queueKey;
    private final String unackedKey;
    private final String messagesKey;
    private final String locksKey;
    private final String attemptsKey;
    private String readMessageWithLockScriptSha;

    public RedisQueue(ObjectMapper mapper, String name, QueueExecutor queueExecutor, Collection<MessageHandler> handlers, List<Queue.DeadMessageCallback> deadMessageHandlers, EventPublisher publisher, MeterRegistry meterRegistry, Boolean fillExecutorEachCycle, Duration requeueDelay, Duration requeueMaxJitter, Boolean canPollMany, TemporalAmount ackTimeout, Integer lockTtlSeconds) {
        super(mapper, name, queueExecutor, handlers, deadMessageHandlers, publisher, meterRegistry, fillExecutorEachCycle, requeueDelay, requeueMaxJitter, canPollMany, ackTimeout, lockTtlSeconds);
        this.queueKey = KeyUtil.buildKey((String)"kekio-queue.v1", (Object[])new Object[]{name, "queue"});
        this.unackedKey = KeyUtil.buildKey((String)"kekio-queue.v1", (Object[])new Object[]{name, "unacked"});
        this.messagesKey = KeyUtil.buildKey((String)"kekio-queue.v1", (Object[])new Object[]{name, "messages"});
        this.locksKey = KeyUtil.buildKey((String)"kekio-queue.v1", (Object[])new Object[]{name, "locks"});
        this.attemptsKey = KeyUtil.buildKey((String)"kekio-queue.v1", (Object[])new Object[]{name, "attempts"});
    }

    @Override
    protected String getQueueKey() {
        return this.queueKey;
    }

    @Override
    protected String getUnackedKey() {
        return this.unackedKey;
    }

    @Override
    protected String getMessagesKey() {
        return this.messagesKey;
    }

    @Override
    protected String getLocksKey() {
        return this.locksKey;
    }

    @Override
    protected String getAttemptsKey() {
        return this.attemptsKey;
    }

    @Override
    protected String getReadMessageWithLockScriptSha() {
        return this.readMessageWithLockScriptSha;
    }

    @Override
    protected void setReadMessageWithLockScriptSha(String sha) {
        this.readMessageWithLockScriptSha = sha;
    }

    @Override
    public void poll(Queue.QueueCallback callback) {
        Triple<String, Instant, String> result = this.readMessageWithLock();
        if (result != null) {
            String fingerprint = (String)result.getLeft();
            Instant scheduledTime = (Instant)result.getMiddle();
            String json = (String)result.getRight();
            Runnable ack = () -> this.ackMessage(fingerprint);
            this.readMessage(fingerprint, json, message -> {
                int maxAttempts;
                AttemptsAttribute attemptsAttr = message.getAttribute(AttemptsAttribute.class);
                int attempts = attemptsAttr != null ? attemptsAttr.getAttempts() : 0;
                MaxAttemptsAttribute maxAttemptsAttr = message.getAttribute(MaxAttemptsAttribute.class);
                int n = maxAttempts = maxAttemptsAttr != null ? maxAttemptsAttr.getMaxAttempts() : 0;
                if (maxAttempts > 0 && attempts > maxAttempts) {
                    log.warn("Message {} with payload {} exceeded {} retries", new Object[]{fingerprint, message, maxAttempts});
                    this.handleDeadMessage((Message)message);
                    this.removeMessage(fingerprint);
                    this.fire(QueueEvent.MessageDead);
                } else {
                    this.fire(new QueueEvent.MessageProcessing((Message)message, scheduledTime, Instant.now()));
                    callback.accept((Message)message, ack);
                }
            });
        }
        this.fire(QueueEvent.QueuePolled);
    }

    @Override
    public void poll(int maxMessages, Queue.QueueCallback callback) {
        this.poll(callback);
    }

    @Override
    public void push(Message message, TemporalAmount delay) {
        this.withJedis((CLIENT commands) -> {
            AbstractRedisQueue.Fingerprint fingerprint = this.fingerprint(message);
            String existingFingerprint = this.firstFingerprint(this.queueKey, fingerprint);
            if (existingFingerprint != null) {
                log.info("Re-prioritizing message as an identical one is already on the queue: {}, message: {}", (Object)existingFingerprint, (Object)message);
                commands.zadd(this.queueKey, this.score(delay), existingFingerprint, ZAddParams.zAddParams().xx());
                this.fire(new QueueEvent.MessageDuplicate(message));
            } else {
                this.queueMessage(message, delay);
                this.fire(new QueueEvent.MessagePushed(message));
            }
        });
    }

    @Override
    @Scheduled(fixedDelayString="${queue.retry.frequency.ms:10000}")
    public void retry() {
        this.withJedis((CLIENT commands) -> {
            List fingerprints = commands.zrangeByScore(this.unackedKey, 0.0, this.score());
            if (CollectionUtils.isNotEmpty((Collection)fingerprints)) {
                String[] lockKeys = (String[])fingerprints.stream().map(fp -> this.locksKey + ":" + fp).toArray(String[]::new);
                commands.del(lockKeys);
            }
            for (String fingerprint : fingerprints) {
                int attempts = this.hgetInt(this.attemptsKey, fingerprint);
                this.readMessageWithoutLock(fingerprint, message -> {
                    int maxAttempts;
                    MaxAttemptsAttribute maxAttemptsAttr = message.getAttribute(MaxAttemptsAttribute.class);
                    int n = maxAttempts = maxAttemptsAttr != null ? maxAttemptsAttr.getMaxAttempts() : 0;
                    if (maxAttempts == 0 && attempts >= 4) {
                        log.warn("Message {} with payload {} exceeded max retries", (Object)fingerprint, message);
                        this.handleDeadMessage((Message)message);
                        this.removeMessage(fingerprint);
                        this.fire(QueueEvent.MessageDead);
                    } else if (this.zismember((JedisCommands)commands, this.queueKey, fingerprint)) {
                        this.multi(tx -> {
                            tx.zrem(this.unackedKey, new String[]{fingerprint});
                            tx.zadd(this.queueKey, this.score(), fingerprint);
                            tx.hincrBy(this.attemptsKey, fingerprint, 1L);
                        });
                        log.info("Not retrying message {} because an identical message is already on the queue", (Object)fingerprint);
                        this.fire(new QueueEvent.MessageDuplicate((Message)message));
                    } else {
                        log.warn("Retrying message {} after {} attempts", (Object)fingerprint, (Object)attempts);
                        commands.hincrBy(this.attemptsKey, fingerprint, 1L);
                        this.requeueMessage(fingerprint);
                        this.fire(QueueEvent.MessageRetried);
                    }
                });
            }
            this.fire(QueueEvent.RetryPolled);
        });
    }

    @Override
    public MonitorableQueue.QueueState readState() {
        List<Object> response = this.multi(tx -> {
            tx.zcard(this.queueKey);
            tx.zcount(this.queueKey, 0.0, this.score());
            tx.zcard(this.unackedKey);
            tx.hlen(this.messagesKey);
        });
        int queued = ((Long)response.get(0)).intValue();
        int ready = ((Long)response.get(1)).intValue();
        int processing = ((Long)response.get(2)).intValue();
        int messages = ((Long)response.get(3)).intValue();
        return new MonitorableQueue.QueueState(queued, ready, processing, messages - (queued + processing), 0);
    }

    @Override
    public boolean containsMessage(Predicate<Message> predicate) {
        return this.withJedis((CLIENT commands) -> {
            String cursor = "0";
            boolean found = false;
            while (!found) {
                ScanResult scanResult = commands.hscan(this.messagesKey, cursor);
                for (Map.Entry entry : scanResult.getResult()) {
                    try {
                        Message message = (Message)JacksonUtil.parseJsonString((String)((String)entry.getValue()), Message.class);
                        if (!predicate.test(message)) continue;
                        found = true;
                        break;
                    }
                    catch (Exception e) {
                        log.error("Failed to read message {}", entry.getKey(), (Object)e);
                    }
                }
                if (!(cursor = scanResult.getCursor()).equals("0")) continue;
                break;
            }
            return found;
        });
    }

    protected void queueMessage(Message message, TemporalAmount delay) {
        TemporalAmount nonnullDelay = delay != null ? delay : Duration.ZERO;
        String fingerprint = this.fingerprint(message).getLatest();
        AttemptsAttribute attemptsAttr = message.getAttribute(AttemptsAttribute.class);
        if (attemptsAttr == null) {
            attemptsAttr = new AttemptsAttribute(0);
            message.setAttribute(attemptsAttr);
        }
        try {
            this.multi(tx -> {
                tx.hset(this.messagesKey, fingerprint, SerDerUtil.serializeAsJsonString(this.mapper, message));
                tx.zadd(this.queueKey, this.score(nonnullDelay), fingerprint);
            });
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to queue message: " + String.valueOf(message), e);
        }
    }

    protected void requeueMessage(String fingerprint) {
        this.multi(tx -> {
            tx.zrem(this.unackedKey, new String[]{fingerprint});
            tx.zadd(this.queueKey, this.score(), fingerprint);
        });
    }

    protected void removeMessage(String fingerprint) {
        this.multi(tx -> {
            tx.zrem(this.queueKey, new String[]{fingerprint});
            tx.zrem(this.unackedKey, new String[]{fingerprint});
            tx.hdel(this.messagesKey, new String[]{fingerprint});
            tx.del(this.locksKey + ":" + fingerprint);
            tx.hdel(this.attemptsKey, new String[]{fingerprint});
        });
    }

    protected void readMessageWithoutLock(String fingerprint, Consumer<Message> block) {
        this.withJedis((CLIENT commands) -> {
            try {
                String json = commands.hget(this.messagesKey, fingerprint);
                if (json != null) {
                    Message message = SerDerUtil.deserializeFromJsonString(this.mapper, json, Message.class);
                    block.accept(message);
                }
            }
            catch (JsonProcessingException e) {
                log.error("Payload for unacked message {} is missing or corrupt", (Object)fingerprint, (Object)e);
                this.removeMessage(fingerprint);
            }
            catch (Exception e) {
                log.error("Failed to read unacked message {}, requeuing...", (Object)fingerprint, (Object)e);
                commands.hincrBy(this.attemptsKey, fingerprint, 1L);
                this.requeueMessage(fingerprint);
            }
        });
    }

    protected Triple<String, Instant, String> readMessageWithLock() {
        return this.withJedis((CLIENT commands) -> {
            try {
                List result;
                Object response = commands.evalsha(this.readMessageWithLockScriptSha, Arrays.asList(this.queueKey, this.unackedKey, this.locksKey, this.messagesKey), Arrays.asList(String.valueOf(this.score()), "10", String.valueOf(this.lockTtlSeconds), String.format(Locale.US, "%f", this.score(this.getAckTimeout())), String.format(Locale.US, "%f", this.score())));
                if (response instanceof List && (result = (List)response).size() >= 3) {
                    return Triple.of((Object)result.get(0).toString(), (Object)Instant.ofEpochMilli(Long.parseLong(result.get(1).toString())), result.get(2) != null ? result.get(2).toString() : null);
                }
                if (Objects.equals(response, "ReadLockFailed")) {
                    this.fire(QueueEvent.LockFailed);
                }
            }
            catch (JedisDataException e) {
                if (e.getMessage() != null && e.getMessage().startsWith("NOSCRIPT")) {
                    this.cacheScript();
                    return this.readMessageWithLock();
                }
                throw e;
            }
            return null;
        });
    }

    protected void readMessage(String fingerprint, String json, Consumer<Message> block) {
        this.withJedis((CLIENT commands) -> {
            if (json == null) {
                log.error("Payload for message {} is missing", (Object)fingerprint);
                this.removeMessage(fingerprint);
            } else {
                try {
                    Message message = SerDerUtil.deserializeFromJsonString(this.mapper, json, Message.class);
                    AttemptsAttribute currentAttempts = message.getAttribute(AttemptsAttribute.class);
                    if (currentAttempts == null) {
                        currentAttempts = new AttemptsAttribute(0);
                    }
                    AttemptsAttribute newAttempts = new AttemptsAttribute(currentAttempts.getAttempts() + 1);
                    message.setAttribute(newAttempts);
                    commands.hset(this.messagesKey, fingerprint, this.mapper.writeValueAsString((Object)message));
                    block.accept(message);
                }
                catch (IOException e) {
                    log.error("Failed to read message {}, requeuing...", (Object)fingerprint, (Object)e);
                    commands.hincrBy(this.attemptsKey, fingerprint, 1L);
                    this.requeueMessage(fingerprint);
                }
            }
        });
    }

    private void ackMessage(String fingerprint) {
        this.withJedis((CLIENT commands) -> {
            if (this.zismember((JedisCommands)commands, this.queueKey, fingerprint)) {
                this.multi(tx -> {
                    tx.zrem(this.unackedKey, new String[]{fingerprint});
                    tx.del(this.locksKey + ":" + fingerprint);
                });
            } else {
                this.removeMessage(fingerprint);
            }
            this.fire(QueueEvent.MessageAcknowledged);
        });
    }
}

