package org.apache.james.mailbox.events;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.events.RoutingKeyConverter;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.StructuredLogger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/apache/james/mailbox/events/EventDispatcher.class */
class EventDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final EventSerializer eventSerializer;
    private final Sender sender;
    private final MailboxListenerRegistry mailboxListenerRegistry;
    private final AMQP.BasicProperties basicProperties;
    private final MailboxListenerExecutor mailboxListenerExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, MailboxListenerRegistry mailboxListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
        this.eventSerializer = eventSerializer;
        this.sender = sender;
        this.mailboxListenerRegistry = mailboxListenerRegistry;
        this.basicProperties = new AMQP.BasicProperties.Builder().headers(ImmutableMap.of("eventBusId", eventBusId.asString())).build();
        this.mailboxListenerExecutor = mailboxListenerExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.sender.declareExchange(ExchangeSpecification.exchange("mailboxEvent-exchange").durable(true).type("direct")).block();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> dispatch(Event event, Set<RegistrationKey> set) {
        return Flux.concat(new Publisher[]{(Mono) Flux.fromIterable(set).subscribeOn(Schedulers.elastic()).flatMap(registrationKey -> {
            return this.mailboxListenerRegistry.getLocalMailboxListeners(registrationKey).map(mailboxListener -> {
                return Pair.of(registrationKey, mailboxListener);
            });
        }).filter(pair -> {
            return ((MailboxListener) pair.getRight()).getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS);
        }).flatMap(pair2 -> {
            return Mono.fromRunnable(Throwing.runnable(() -> {
                executeListener(event, (MailboxListener) pair2.getRight(), (RegistrationKey) pair2.getLeft());
            })).doOnError(th -> {
                structuredLogger(event, set).log(logger -> {
                    logger.error("Exception happens when dispatching event", th);
                });
            }).onErrorResume(th2 -> {
                return Mono.empty();
            });
        }).cache().then().subscribeWith(MonoProcessor.create()), (Mono) doDispatch(Mono.just(event).publishOn(Schedulers.parallel()).map(this::serializeEvent).cache(), set).cache().subscribeWith(MonoProcessor.create())}).then();
    }

    private void executeListener(Event event, MailboxListener mailboxListener, RegistrationKey registrationKey) throws Exception {
        this.mailboxListenerExecutor.execute(mailboxListener, MDCBuilder.create().addContext("registrationKey", registrationKey), event);
    }

    private StructuredLogger structuredLogger(Event event, Set<RegistrationKey> set) {
        return MDCStructuredLogger.forLogger(LOGGER).addField("eventId", event.getEventId()).addField("eventClass", event.getClass()).addField("user", event.getUser()).addField("registrationKeys", set);
    }

    private Mono<Void> doDispatch(Mono<byte[]> mono, Set<RegistrationKey> set) {
        return this.sender.send(Flux.concat(new Publisher[]{Mono.just(RoutingKeyConverter.RoutingKey.empty()), Flux.fromIterable(set).map(RoutingKeyConverter.RoutingKey::of)}).flatMap(routingKey -> {
            return mono.map(bArr -> {
                return new OutboundMessage("mailboxEvent-exchange", routingKey.asString(), this.basicProperties, bArr);
            });
        }));
    }

    private byte[] serializeEvent(Event event) {
        return this.eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
    }
}
