package org.apache.james.mailbox.events;

import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
import org.apache.james.mailbox.events.delivery.EventDelivery;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/mailbox/events/InVMEventBus.class */
public class InVMEventBus implements EventBus {
    private final Multimap<RegistrationKey, MailboxListener> registrations;
    private final ConcurrentHashMap<Group, MailboxListener> groups;
    private final EventDelivery eventDelivery;
    private final RetryBackoffConfiguration retryBackoff;
    private final EventDeadLetters eventDeadLetters;

    @Inject
    public InVMEventBus(EventDelivery eventDelivery, RetryBackoffConfiguration retryBackoffConfiguration, EventDeadLetters eventDeadLetters) {
        this.eventDelivery = eventDelivery;
        this.retryBackoff = retryBackoffConfiguration;
        this.eventDeadLetters = eventDeadLetters;
        this.registrations = Multimaps.synchronizedSetMultimap(HashMultimap.create());
        this.groups = new ConcurrentHashMap<>();
    }

    @VisibleForTesting
    public InVMEventBus(EventDelivery eventDelivery) {
        this(eventDelivery, RetryBackoffConfiguration.DEFAULT, new MemoryEventDeadLetters());
    }

    public Registration register(MailboxListener mailboxListener, RegistrationKey registrationKey) {
        this.registrations.put(registrationKey, mailboxListener);
        return () -> {
            this.registrations.remove(registrationKey, mailboxListener);
        };
    }

    public Registration register(MailboxListener mailboxListener, Group group) {
        if (this.groups.putIfAbsent(group, mailboxListener) == null) {
            return () -> {
                this.groups.remove(group, mailboxListener);
            };
        }
        throw new GroupAlreadyRegistered(group);
    }

    public Mono<Void> dispatch(Event event, Set<RegistrationKey> set) {
        return !event.isNoop() ? Flux.merge(new Publisher[]{groupDeliveries(event), keyDeliveries(event, set)}).reduceWith(EventDelivery.ExecutionStages::empty, (v0, v1) -> {
            return v0.combine(v1);
        }).flatMap((v0) -> {
            return v0.synchronousListenerFuture();
        }).then().onErrorResume(th -> {
            return Mono.empty();
        }) : Mono.empty();
    }

    public Mono<Void> reDeliver(Group group, Event event) {
        return !event.isNoop() ? Mono.fromCallable(() -> {
            return groupDelivery(event, retrieveListenerFromGroup(group), group);
        }).flatMap((v0) -> {
            return v0.synchronousListenerFuture();
        }).then() : Mono.empty();
    }

    private MailboxListener retrieveListenerFromGroup(Group group) {
        return (MailboxListener) Optional.ofNullable(this.groups.get(group)).orElseThrow(() -> {
            return new GroupRegistrationNotFound(group);
        });
    }

    private Flux<EventDelivery.ExecutionStages> keyDeliveries(Event event, Set<RegistrationKey> set) {
        return Flux.fromIterable(registeredListenersByKeys(set)).map(mailboxListener -> {
            return this.eventDelivery.deliver(mailboxListener, event, EventDelivery.DeliveryOption.none());
        });
    }

    private Flux<EventDelivery.ExecutionStages> groupDeliveries(Event event) {
        return Flux.fromIterable(this.groups.entrySet()).map(entry -> {
            return groupDelivery(event, (MailboxListener) entry.getValue(), (Group) entry.getKey());
        });
    }

    private EventDelivery.ExecutionStages groupDelivery(Event event, MailboxListener mailboxListener, Group group) {
        return this.eventDelivery.deliver(mailboxListener, event, EventDelivery.DeliveryOption.of(EventDelivery.Retryer.BackoffRetryer.of(this.retryBackoff, mailboxListener), EventDelivery.PermanentFailureHandler.StoreToDeadLetters.of(group, this.eventDeadLetters)));
    }

    public Set<Group> registeredGroups() {
        return this.groups.keySet();
    }

    private Set<MailboxListener> registeredListenersByKeys(Set<RegistrationKey> set) {
        return (Set) set.stream().flatMap(registrationKey -> {
            return this.registrations.get(registrationKey).stream();
        }).collect(Guavate.toImmutableSet());
    }
}
