package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.consumers.ThrowingConsumer;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer.class */
class Dequeuer {
    private static final boolean REQUEUE = true;
    private final Flux<AcknowledgableDelivery> flux;
    private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
    private final Metric dequeueMetric;
    private final MailReferenceSerializer mailReferenceSerializer;
    private final MailQueueView mailQueueView;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer$RabbitMQMailQueueItem.class */
    public static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
        private final Consumer<Boolean> ack;
        private final EnqueueId enqueueId;
        private final Mail mail;

        private RabbitMQMailQueueItem(Consumer<Boolean> consumer, MailWithEnqueueId mailWithEnqueueId) {
            this.ack = consumer;
            this.enqueueId = mailWithEnqueueId.getEnqueueId();
            this.mail = mailWithEnqueueId.getMail();
        }

        public Mail getMail() {
            return this.mail;
        }

        public EnqueueId getEnqueueId() {
            return this.enqueueId;
        }

        public void done(boolean z) {
            this.ack.accept(Boolean.valueOf(z));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Dequeuer(MailQueueName mailQueueName, RabbitClient rabbitClient, Function<MailReferenceDTO, MailWithEnqueueId> function, MailReferenceSerializer mailReferenceSerializer, MetricFactory metricFactory, MailQueueView mailQueueView) {
        this.mailLoader = function;
        this.mailReferenceSerializer = mailReferenceSerializer;
        this.mailQueueView = mailQueueView;
        this.dequeueMetric = metricFactory.generate("dequeuedMail:" + mailQueueName.asString());
        this.flux = rabbitClient.receive(mailQueueName).filter(acknowledgableDelivery -> {
            return acknowledgableDelivery.getBody() != null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<? extends MailQueue.MailQueueItem> deQueue() {
        return this.flux.concatMap(this::loadItem).concatMap(this::filterIfDeleted);
    }

    private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem rabbitMQMailQueueItem) {
        return this.mailQueueView.isPresent(rabbitMQMailQueueItem.getEnqueueId()).flatMap(bool -> {
            return keepWhenPresent(rabbitMQMailQueueItem, bool);
        });
    }

    private Mono<? extends RabbitMQMailQueueItem> keepWhenPresent(RabbitMQMailQueueItem rabbitMQMailQueueItem, Boolean bool) {
        if (bool.booleanValue()) {
            return Mono.just(rabbitMQMailQueueItem);
        }
        rabbitMQMailQueueItem.done(true);
        return Mono.empty();
    }

    private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery acknowledgableDelivery) {
        try {
            MailWithEnqueueId loadMail = loadMail(acknowledgableDelivery);
            return Mono.just(new RabbitMQMailQueueItem(ack(acknowledgableDelivery, loadMail), loadMail));
        } catch (MailQueue.MailQueueException e) {
            return Mono.error(e);
        }
    }

    private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery acknowledgableDelivery, MailWithEnqueueId mailWithEnqueueId) {
        return bool -> {
            if (!bool.booleanValue()) {
                acknowledgableDelivery.nack(true);
                return;
            }
            this.dequeueMetric.increment();
            acknowledgableDelivery.ack();
            this.mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId()));
        };
    }

    private MailWithEnqueueId loadMail(Delivery delivery) throws MailQueue.MailQueueException {
        return this.mailLoader.apply(toMailReference(delivery));
    }

    private MailReferenceDTO toMailReference(Delivery delivery) throws MailQueue.MailQueueException {
        try {
            return this.mailReferenceSerializer.read(delivery.getBody());
        } catch (IOException e) {
            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
        }
    }
}
