/*
 * Decompiled with CFR 0.152.
 */
package cn.sliew.carp.framework.queue.kekio.memory;

import cn.hutool.core.thread.ThreadUtil;
import cn.sliew.carp.framework.common.util.UUIDUtil;
import cn.sliew.carp.framework.queue.kekio.AbstractQueue;
import cn.sliew.carp.framework.queue.kekio.MessageHandler;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.carp.framework.queue.kekio.metrics.QueueEvent;
import io.micrometer.core.instrument.MeterRegistry;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class InMemoryQueue
extends AbstractQueue
implements MonitorableQueue,
InitializingBean,
DisposableBean {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InMemoryQueue.class);
    private ScheduledThreadPoolExecutor scheduledExecutor;
    private final DelayQueue<Envelope> queue = new DelayQueue();
    private final DelayQueue<Envelope> unacked = new DelayQueue();

    public InMemoryQueue(String name, QueueExecutor queueExecutor, Collection<MessageHandler> handlers, List<Queue.DeadMessageCallback> deadMessageHandlers, EventPublisher eventPublisher, MeterRegistry meterRegistry, Boolean fillExecutorEachCycle, Duration requeueDelay, Duration requeueMaxJitter, Boolean canPollMany, TemporalAmount ackTimeout) {
        super(name, queueExecutor, handlers, deadMessageHandlers, eventPublisher, meterRegistry, fillExecutorEachCycle, requeueDelay, requeueMaxJitter, canPollMany, ackTimeout);
    }

    public void afterPropertiesSet() throws Exception {
        this.scheduledExecutor = ThreadUtil.createScheduledExecutor((int)1);
        ThreadUtil.schedule((ScheduledThreadPoolExecutor)this.scheduledExecutor, () -> this.retry(), (long)0L, (long)10000L, (boolean)false);
        log.debug("Start process queue retry: {}", (Object)this.queue.getClass().getSimpleName());
    }

    public void destroy() throws Exception {
        if (Objects.nonNull(this.scheduledExecutor)) {
            this.scheduledExecutor.shutdown();
            log.info("Stop process queue poll: {}", (Object)this.queue.getClass().getSimpleName());
        }
    }

    @Override
    public void poll(Queue.QueueCallback callback) {
        this.fire(QueueEvent.QueuePolled);
        Envelope envelope = (Envelope)this.queue.poll();
        if (envelope != null) {
            TemporalAmount messageAckTimeout;
            TemporalAmount temporalAmount = messageAckTimeout = envelope.getPayload().getAckTimeoutMs() == null ? this.ackTimeout : Duration.ofMillis(envelope.getPayload().getAckTimeoutMs());
            if (this.unacked.stream().anyMatch(e -> Objects.equals(e.getPayload(), envelope.getPayload()))) {
                this.queue.put(envelope);
            } else {
                this.unacked.put(new Envelope(envelope.getId(), envelope.getPayload(), Instant.now().plus(messageAckTimeout), envelope.getCount()));
                this.fire(new QueueEvent.MessageProcessing(envelope.getPayload(), envelope.getScheduledTime(), Instant.now()));
                callback.accept(envelope.getPayload(), () -> {
                    this.ack(envelope.getId());
                    this.fire(QueueEvent.MessageAcknowledged);
                });
            }
        }
    }

    @Override
    public void poll(int maxMessages, Queue.QueueCallback callback) {
        this.poll(callback);
    }

    @Override
    public void push(Message message, TemporalAmount delay) {
        boolean existed = this.queue.removeIf(e -> Objects.equals(e.getPayload(), message));
        this.queue.put(new Envelope(message, Instant.now().plus(delay)));
        if (existed) {
            this.fire(new QueueEvent.MessageDuplicate(message));
        } else {
            this.fire(new QueueEvent.MessagePushed(message));
        }
    }

    @Override
    public void retry() {
        Envelope message;
        Instant now = Instant.now();
        this.fire(QueueEvent.RetryPolled);
        while ((message = (Envelope)this.unacked.poll()) != null) {
            Envelope messageVal = message;
            if (messageVal.getCount() >= 5) {
                if (CollectionUtils.isNotEmpty((Collection)this.deadMessageHandlers)) {
                    for (Queue.DeadMessageCallback handler : this.deadMessageHandlers) {
                        handler.accept(this, messageVal.getPayload());
                    }
                }
                this.fire(QueueEvent.MessageDead);
                continue;
            }
            boolean existed = this.queue.removeIf(e -> Objects.equals(e.getPayload(), messageVal.getPayload()));
            log.warn("Redelivering unacked message {}", (Object)messageVal.getPayload());
            this.queue.put(new Envelope(messageVal.getId(), messageVal.getPayload(), now, messageVal.getCount() + 1));
            if (existed) {
                this.fire(new QueueEvent.MessageDuplicate(messageVal.getPayload()));
                continue;
            }
            this.fire(QueueEvent.MessageRetried);
        }
    }

    @Override
    public void clear() {
        this.queue.removeIf(s -> true);
    }

    @Override
    public TemporalAmount getAckTimeout() {
        return this.ackTimeout;
    }

    @Override
    public List<Queue.DeadMessageCallback> getDeadMessageHandlers() {
        return this.deadMessageHandlers;
    }

    @Override
    public Boolean canPollMany() {
        return this.canPollMany;
    }

    @Override
    public EventPublisher getPublisher() {
        return this.publisher;
    }

    @Override
    public MonitorableQueue.QueueState readState() {
        return new MonitorableQueue.QueueState(this.queue.size(), (int)this.queue.stream().filter(e -> e.getDelay(TimeUnit.NANOSECONDS) <= 0L).count(), this.unacked.size());
    }

    @Override
    public boolean containsMessage(Predicate<Message> predicate) {
        return this.queue.stream().map(Envelope::getPayload).anyMatch(predicate);
    }

    private void ack(String messageId) {
        this.unacked.removeIf(e -> e.getId().equals(messageId));
    }

    public static class Envelope
    implements Delayed {
        private final String id;
        private final Message payload;
        private final Instant scheduledTime;
        private final int count;

        public Envelope(Message payload, Instant scheduledTime) {
            this(UUIDUtil.randomUUId(), payload, scheduledTime, 1);
        }

        @Override
        public int compareTo(Delayed other) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return Instant.now().until(this.scheduledTime, unit.toChronoUnit());
        }

        @Generated
        public String getId() {
            return this.id;
        }

        @Generated
        public Message getPayload() {
            return this.payload;
        }

        @Generated
        public Instant getScheduledTime() {
            return this.scheduledTime;
        }

        @Generated
        public int getCount() {
            return this.count;
        }

        @ConstructorProperties(value={"id", "payload", "scheduledTime", "count"})
        @Generated
        public Envelope(String id, Message payload, Instant scheduledTime, int count) {
            this.id = id;
            this.payload = payload;
            this.scheduledTime = scheduledTime;
            this.count = count;
        }
    }
}

