package org.apache.james.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.events.EventListener;
import org.apache.james.events.LocalListenerRegistry;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/apache/james/events/KeyRegistrationHandler.class */
class KeyRegistrationHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class);
    private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30);
    static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", Long.valueOf(EXPIRATION_TIMEOUT.toMillis()));
    private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1);
    private final NamingStrategy namingStrategy;
    private final EventBusId eventBusId;
    private final LocalListenerRegistry localListenerRegistry;
    private final EventSerializer eventSerializer;
    private final Sender sender;
    private final RoutingKeyConverter routingKeyConverter;
    private final Receiver receiver;
    private final RegistrationQueueName registrationQueue;
    private final RegistrationBinder registrationBinder;
    private final ListenerExecutor listenerExecutor;
    private final RetryBackoffConfiguration retryBackoff;
    private Optional<Disposable> receiverSubscriber = Optional.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyRegistrationHandler(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, ListenerExecutor listenerExecutor, RetryBackoffConfiguration retryBackoffConfiguration) {
        this.namingStrategy = namingStrategy;
        this.eventBusId = eventBusId;
        this.eventSerializer = eventSerializer;
        this.sender = sender;
        this.routingKeyConverter = routingKeyConverter;
        this.localListenerRegistry = localListenerRegistry;
        this.receiver = receiverProvider.createReceiver();
        this.listenerExecutor = listenerExecutor;
        this.retryBackoff = retryBackoffConfiguration;
        this.registrationQueue = namingStrategy.queueName(eventBusId);
        this.registrationBinder = new RegistrationBinder(namingStrategy, sender, this.registrationQueue);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        declareQueue();
        this.receiverSubscriber = Optional.of(this.receiver.consumeAutoAck(this.registrationQueue.asString(), new ConsumeOptions().qos(10)).subscribeOn(Schedulers.parallel()).flatMap(this::handleDelivery, 10).subscribe());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void declareQueue() {
        declareQueue(this.sender);
    }

    private void declareQueue(Sender sender) {
        sender.declareQueue(QueueSpecification.queue(this.registrationQueue.asString()).durable(true).exclusive(false).autoDelete(true).arguments(QUEUE_ARGUMENTS)).timeout(TOPOLOGY_CHANGES_TIMEOUT).map((v0) -> {
            return v0.getQueue();
        }).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor())).block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.sender.delete(QueueSpecification.queue(this.registrationQueue.asString())).timeout(TOPOLOGY_CHANGES_TIMEOUT).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())).block();
        this.receiverSubscriber.filter(Predicate.not((v0) -> {
            return v0.isDisposed();
        })).ifPresent((v0) -> {
            v0.dispose();
        });
        this.receiver.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Registration> register(EventListener.ReactiveEventListener reactiveEventListener, RegistrationKey registrationKey) {
        LocalListenerRegistry.LocalRegistration addListener = this.localListenerRegistry.addListener(registrationKey, reactiveEventListener);
        return registerIfNeeded(registrationKey, addListener).thenReturn(new KeyRegistration(() -> {
            if (addListener.unregister().lastListenerRemoved()) {
                this.registrationBinder.unbind(registrationKey).timeout(TOPOLOGY_CHANGES_TIMEOUT).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())).subscribeOn(Schedulers.elastic()).block();
            }
        }));
    }

    private Mono<Void> registerIfNeeded(RegistrationKey registrationKey, LocalListenerRegistry.LocalRegistration localRegistration) {
        return localRegistration.isFirstListener() ? this.registrationBinder.bind(registrationKey).timeout(TOPOLOGY_CHANGES_TIMEOUT).retryWhen(Retry.backoff(this.retryBackoff.getMaxRetries(), this.retryBackoff.getFirstBackoff()).jitter(this.retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) : Mono.empty();
    }

    private Mono<Void> handleDelivery(Delivery delivery) {
        if (delivery.getBody() == null) {
            return Mono.empty();
        }
        EventBusId of = EventBusId.of(delivery.getProperties().getHeaders().get("eventBusId").toString());
        RegistrationKey registrationKey = this.routingKeyConverter.toRegistrationKey(delivery.getEnvelope().getRoutingKey());
        Event event = toEvent(delivery);
        return this.localListenerRegistry.getLocalListeners(registrationKey).filter(reactiveEventListener -> {
            return !isLocalSynchronousListeners(of, reactiveEventListener);
        }).flatMap(reactiveEventListener2 -> {
            return executeListener(reactiveEventListener2, event, registrationKey);
        }, 10).then();
    }

    private Mono<Void> executeListener(EventListener.ReactiveEventListener reactiveEventListener, Event event, RegistrationKey registrationKey) {
        return this.listenerExecutor.execute(reactiveEventListener, MDCBuilder.create().addContext("registrationKey", registrationKey), event).doOnError(th -> {
            structuredLogger(event, registrationKey).log(logger -> {
                logger.error("Exception happens when handling event", th);
            });
        }).onErrorResume(th2 -> {
            return Mono.empty();
        }).then();
    }

    private boolean isLocalSynchronousListeners(EventBusId eventBusId, EventListener eventListener) {
        return eventBusId.equals(this.eventBusId) && eventListener.getExecutionMode().equals(EventListener.ExecutionMode.SYNCHRONOUS);
    }

    private Event toEvent(Delivery delivery) {
        return this.eventSerializer.asEvent(new String(delivery.getBody(), StandardCharsets.UTF_8));
    }

    private StructuredLogger structuredLogger(Event event, RegistrationKey registrationKey) {
        return MDCStructuredLogger.forLogger(LOGGER).addField("eventId", event.getEventId()).addField("eventClass", event.getClass()).addField("user", event.getUsername()).addField("registrationKey", registrationKey);
    }
}
