package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
import reactor.core.publisher.Flux;
import reactor.rabbitmq.AcknowledgableDelivery;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer.class */
class Dequeuer {
    private static final boolean REQUEUE = true;
    private final LinkedBlockingQueue<AcknowledgableDelivery> messages;
    private final Function<MailReferenceDTO, Mail> mailLoader;
    private final Metric dequeueMetric;
    private final MailReferenceSerializer mailReferenceSerializer;
    private final MailQueueView mailQueueView;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer$RabbitMQMailQueueItem.class */
    public static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
        private final Consumer<Boolean> ack;
        private final Mail mail;

        private RabbitMQMailQueueItem(Consumer<Boolean> consumer, Mail mail) {
            this.ack = consumer;
            this.mail = mail;
        }

        public Mail getMail() {
            return this.mail;
        }

        public void done(boolean z) {
            this.ack.accept(Boolean.valueOf(z));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Dequeuer(MailQueueName mailQueueName, RabbitClient rabbitClient, Function<MailReferenceDTO, Mail> function, MailReferenceSerializer mailReferenceSerializer, MetricFactory metricFactory, MailQueueView mailQueueView) {
        this.mailLoader = function;
        this.mailReferenceSerializer = mailReferenceSerializer;
        this.mailQueueView = mailQueueView;
        this.dequeueMetric = metricFactory.generate("dequeuedMail:" + mailQueueName.asString());
        this.messages = messageIterator(mailQueueName, rabbitClient);
    }

    private LinkedBlockingQueue<AcknowledgableDelivery> messageIterator(MailQueueName mailQueueName, RabbitClient rabbitClient) {
        LinkedBlockingQueue<AcknowledgableDelivery> linkedBlockingQueue = new LinkedBlockingQueue<>(REQUEUE);
        Flux filter = rabbitClient.receive(mailQueueName).filter(acknowledgableDelivery -> {
            return acknowledgableDelivery.getBody() != null;
        });
        linkedBlockingQueue.getClass();
        filter.doOnNext(Throwing.consumer((v1) -> {
            r1.put(v1);
        })).subscribe();
        return linkedBlockingQueue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException, InterruptedException {
        return loadItem(this.messages.take());
    }

    private RabbitMQMailQueueItem loadItem(AcknowledgableDelivery acknowledgableDelivery) throws MailQueue.MailQueueException {
        Mail loadMail = loadMail(acknowledgableDelivery);
        return new RabbitMQMailQueueItem(ack(acknowledgableDelivery, acknowledgableDelivery.getEnvelope().getDeliveryTag(), loadMail), loadMail);
    }

    private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery acknowledgableDelivery, long j, Mail mail) {
        return bool -> {
            if (!bool.booleanValue()) {
                acknowledgableDelivery.nack(true);
                return;
            }
            this.dequeueMetric.increment();
            acknowledgableDelivery.ack();
            this.mailQueueView.delete(DeleteCondition.withName(mail.getName()));
        };
    }

    private Mail loadMail(Delivery delivery) throws MailQueue.MailQueueException {
        return this.mailLoader.apply(toMailReference(delivery));
    }

    private MailReferenceDTO toMailReference(Delivery delivery) throws MailQueue.MailQueueException {
        try {
            return this.mailReferenceSerializer.read(delivery.getBody());
        } catch (IOException e) {
            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
        }
    }
}
