package org.apache.james.mailbox.events;

import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.rabbitmq.client.Connection;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.james.backend.rabbitmq.Constants;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/mailbox/events/GroupRegistration.class */
class GroupRegistration implements Registration {
    static final String RETRY_COUNT = "retry-count";
    static final int DEFAULT_RETRY_COUNT = 0;
    private final MailboxListener mailboxListener;
    private final WorkQueueName queueName;
    private final Receiver receiver;
    private final Runnable unregisterGroup;
    private final Sender sender;
    private final EventSerializer eventSerializer;
    private final GroupConsumerRetry retryHandler;
    private final WaitDelayGenerator delayGenerator;
    private final Group group;
    private final MailboxListenerExecutor mailboxListenerExecutor;
    private Optional<Disposable> receiverSubscriber = Optional.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/james/mailbox/events/GroupRegistration$WorkQueueName.class */
    public static class WorkQueueName {
        static final String MAILBOX_EVENT_WORK_QUEUE_PREFIX = "mailboxEvent-workQueue-";
        private final Group group;

        static WorkQueueName of(Group group) {
            return new WorkQueueName(group);
        }

        private WorkQueueName(Group group) {
            Preconditions.checkNotNull(group, "Group must be specified");
            this.group = group;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String asString() {
            return MAILBOX_EVENT_WORK_QUEUE_PREFIX + this.group.asString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistration(Mono<Connection> mono, Sender sender, EventSerializer eventSerializer, MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoffConfiguration, EventDeadLetters eventDeadLetters, Runnable runnable, MailboxListenerExecutor mailboxListenerExecutor) {
        this.eventSerializer = eventSerializer;
        this.mailboxListener = mailboxListener;
        this.queueName = WorkQueueName.of(group);
        this.sender = sender;
        this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(mono));
        this.mailboxListenerExecutor = mailboxListenerExecutor;
        this.unregisterGroup = runnable;
        this.retryHandler = new GroupConsumerRetry(sender, group, retryBackoffConfiguration, eventDeadLetters);
        this.delayGenerator = WaitDelayGenerator.of(retryBackoffConfiguration);
        this.group = group;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupRegistration start() {
        createGroupWorkQueue().then(this.retryHandler.createRetryExchange(this.queueName)).doOnSuccess(r3 -> {
            subscribeWorkQueue();
        }).block();
        return this;
    }

    private Mono<Void> createGroupWorkQueue() {
        return Flux.concat(new Publisher[]{this.sender.declareQueue(QueueSpecification.queue(this.queueName.asString()).durable(true).exclusive(false).autoDelete(false).arguments(Constants.NO_ARGUMENTS)), this.sender.bind(BindingSpecification.binding().exchange("mailboxEvent-exchange").queue(this.queueName.asString()).routingKey(""))}).then();
    }

    private void subscribeWorkQueue() {
        this.receiverSubscriber = Optional.of(this.receiver.consumeManualAck(this.queueName.asString()).subscribeOn(Schedulers.parallel()).filter(acknowledgableDelivery -> {
            return Objects.nonNull(acknowledgableDelivery.getBody());
        }).flatMap(this::deliver).subscribeOn(Schedulers.elastic()).subscribe());
    }

    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
        byte[] body = acknowledgableDelivery.getBody();
        Event event = (Event) this.eventSerializer.fromJson(new String(body, StandardCharsets.UTF_8)).get();
        int retryCount = getRetryCount(acknowledgableDelivery);
        Mono onErrorResume = this.delayGenerator.delayIfHaveTo(retryCount).flatMap(num -> {
            return Mono.fromRunnable(Throwing.runnable(() -> {
                runListener(event);
            })).publishOn(Schedulers.elastic());
        }).onErrorResume(th -> {
            return this.retryHandler.handleRetry(body, event, retryCount, th);
        });
        acknowledgableDelivery.getClass();
        return onErrorResume.then(Mono.fromRunnable(acknowledgableDelivery::ack)).subscribeWith(MonoProcessor.create()).then();
    }

    private void runListener(Event event) throws Exception {
        this.mailboxListenerExecutor.execute(this.mailboxListener, MDCBuilder.create().addContext("group", this.group), event);
    }

    private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
        return ((Integer) Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders()).flatMap(map -> {
            return Optional.ofNullable(map.get(RETRY_COUNT));
        }).filter(obj -> {
            return obj instanceof Integer;
        }).map(obj2 -> {
            return (Integer) obj2;
        }).orElse(Integer.valueOf(DEFAULT_RETRY_COUNT))).intValue();
    }

    public void unregister() {
        this.receiverSubscriber.filter(disposable -> {
            return !disposable.isDisposed();
        }).ifPresent((v0) -> {
            v0.dispose();
        });
        this.receiver.close();
        this.unregisterGroup.run();
    }
}
