package org.apache.james.queue.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;

/* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer.class */
class Dequeuer {
    private static final int TEN_MS = 10;
    private final MailQueueName name;
    private final RabbitClient rabbitClient;
    private final Function<MailReferenceDTO, Mail> mailLoader;
    private final Metric dequeueMetric;
    private final MailReferenceSerializer mailReferenceSerializer;
    private final MailQueueView mailQueueView;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer$NoMailYetException.class */
    public static class NoMailYetException extends RuntimeException {
        private NoMailYetException() {
        }
    }

    /* loaded from: input_file:org/apache/james/queue/rabbitmq/Dequeuer$RabbitMQMailQueueItem.class */
    private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
        private final Consumer<Boolean> ack;
        private final Mail mail;

        private RabbitMQMailQueueItem(Consumer<Boolean> consumer, Mail mail) {
            this.ack = consumer;
            this.mail = mail;
        }

        public Mail getMail() {
            return this.mail;
        }

        public void done(boolean z) {
            this.ack.accept(Boolean.valueOf(z));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Dequeuer(MailQueueName mailQueueName, RabbitClient rabbitClient, Function<MailReferenceDTO, Mail> function, MailReferenceSerializer mailReferenceSerializer, MetricFactory metricFactory, MailQueueView mailQueueView) {
        this.name = mailQueueName;
        this.rabbitClient = rabbitClient;
        this.mailLoader = function;
        this.mailReferenceSerializer = mailReferenceSerializer;
        this.mailQueueView = mailQueueView;
        this.dequeueMetric = metricFactory.generate("dequeuedMail:" + mailQueueName.asString());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MailQueue.MailQueueItem deQueue() {
        return (MailQueue.MailQueueItem) pollChannel().thenApply(Throwing.function(this::loadItem).sneakyThrow()).join();
    }

    private RabbitMQMailQueueItem loadItem(GetResponse getResponse) throws MailQueue.MailQueueException {
        Mail loadMail = loadMail(getResponse);
        return new RabbitMQMailQueueItem(ack(getResponse.getEnvelope().getDeliveryTag(), loadMail), loadMail);
    }

    private ThrowingConsumer<Boolean> ack(long j, Mail mail) {
        return bool -> {
            try {
                if (bool.booleanValue()) {
                    this.dequeueMetric.increment();
                    this.rabbitClient.ack(j);
                    this.mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
                } else {
                    this.rabbitClient.nack(j);
                }
            } catch (IOException e) {
                throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + j, e);
            }
        };
    }

    private Mail loadMail(GetResponse getResponse) throws MailQueue.MailQueueException {
        return this.mailLoader.apply(toMailReference(getResponse));
    }

    private MailReferenceDTO toMailReference(GetResponse getResponse) throws MailQueue.MailQueueException {
        try {
            return this.mailReferenceSerializer.read(getResponse.getBody());
        } catch (IOException e) {
            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
        }
    }

    private CompletableFuture<GetResponse> pollChannel() {
        return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()).withFixedRate().withMinDelay(10L).retryOn(new Class[]{NoMailYetException.class}).getWithRetry(this::singleChannelRead);
    }

    private GetResponse singleChannelRead() throws IOException {
        return this.rabbitClient.poll(this.name).filter(getResponse -> {
            return getResponse.getBody() != null;
        }).orElseThrow(() -> {
            return new NoMailYetException();
        });
    }
}
