package org.apache.james.mailbox.events;

import com.google.common.collect.ImmutableSet;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.james.backends.rabbitmq.Constants;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.EventBusConcurrentTestContract;
import org.apache.james.mailbox.events.EventBusContract;
import org.apache.james.mailbox.events.EventBusTestFixture;
import org.apache.james.mailbox.events.GroupConsumerRetry;
import org.apache.james.mailbox.events.GroupContract;
import org.apache.james.mailbox.events.GroupRegistration;
import org.apache.james.mailbox.events.KeyContract;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.events.RegistrationKey;
import org.apache.james.mailbox.model.TestId;
import org.apache.james.mailbox.model.TestMessageId;
import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver;
import org.apache.james.mailbox.util.EventCollector;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest.class */
public class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract, KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract, ErrorHandlingContract {

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ();
    private RabbitMQEventBus eventBus;
    private RabbitMQEventBus eventBus2;
    private RabbitMQEventBus eventBus3;
    private RabbitMQEventBus eventBusWithKeyHandlerNotStarted;
    private EventSerializer eventSerializer;
    private RoutingKeyConverter routingKeyConverter;
    private MemoryEventDeadLetters memoryEventDeadLetters;

    @Nested
    /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$AtLeastOnceTest.class */
    class AtLeastOnceTest {
        AtLeastOnceTest() {
        }

        @Test
        void inProcessingEventShouldBeReDispatchedToAnotherEventBusWhenOneIsDown() {
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution mailboxListenerCountingSuccessfulExecution = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution mailboxListenerCountingSuccessfulExecution2 = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution mailboxListenerCountingSuccessfulExecution3 = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution) Mockito.spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            Answer answer = invocationOnMock -> {
                invocationOnMock.callRealMethod();
                TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                return null;
            };
            ((EventBusTestFixture.MailboxListenerCountingSuccessfulExecution) Mockito.doAnswer(answer).when(mailboxListenerCountingSuccessfulExecution)).event((Event) ArgumentMatchers.any());
            ((EventBusTestFixture.MailboxListenerCountingSuccessfulExecution) Mockito.doAnswer(answer).when(mailboxListenerCountingSuccessfulExecution2)).event((Event) ArgumentMatchers.any());
            RabbitMQEventBusTest.this.eventBus.register(mailboxListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus2.register(mailboxListenerCountingSuccessfulExecution2, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus3.register(mailboxListenerCountingSuccessfulExecution3, EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(mailboxListenerCountingSuccessfulExecution.numberOfEventCalls()).isEqualTo(1);
            });
            RabbitMQEventBusTest.this.eventBus.stop();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(mailboxListenerCountingSuccessfulExecution2.numberOfEventCalls()).isEqualTo(1);
            });
            RabbitMQEventBusTest.this.eventBus2.stop();
            RabbitMQEventBusTest.this.getSpeedProfile().shortWaitCondition().untilAsserted(() -> {
                Assertions.assertThat(mailboxListenerCountingSuccessfulExecution3.numberOfEventCalls()).isEqualTo(1);
            });
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$ConcurrentTest.class */
    class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
        ConcurrentTest() {
        }

        public EventBusContract.EnvironmentSpeedProfile getSpeedProfile() {
            return EventBusContract.EnvironmentSpeedProfile.SLOW;
        }

        @Test
        void rabbitMQEventBusShouldHandleBulksGracefully() throws Exception {
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution newCountingListener = EventBusConcurrentTestContract.newCountingListener();
            eventBus().register(newCountingListener, new EventBusTestFixture.GroupA());
            int i = 1;
            int i2 = 10 * 10000;
            RabbitMQEventBusTest.this.eventBus = eventBus();
            ConcurrentTestRunner.builder().operation((i3, i4) -> {
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            }).threadCount(10).operationCount(10000).runSuccessfullyWithin(Duration.ofMinutes(3L));
            Awaitility.await().pollInterval(org.awaitility.Duration.FIVE_SECONDS).timeout(org.awaitility.Duration.TEN_MINUTES).untilAsserted(() -> {
                Assertions.assertThat(newCountingListener.numberOfEventCalls()).isEqualTo(i * i2);
            });
        }

        public EventBus eventBus3() {
            return RabbitMQEventBusTest.this.eventBus3;
        }

        public EventBus eventBus2() {
            return RabbitMQEventBusTest.this.eventBus2;
        }

        public EventBus eventBus() {
            return RabbitMQEventBusTest.this.eventBus;
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$LifeCycleTest.class */
    class LifeCycleTest {
        private static final int THREAD_COUNT = 10;
        private static final int OPERATION_COUNT = 100000;
        private RabbitMQManagementAPI rabbitManagementAPI;

        @Nested
        /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$LifeCycleTest$MultiEventBus.class */
        class MultiEventBus {
            MultiEventBus() {
            }

            @Test
            void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> {
                    return exchange.getName().equals("mailboxEvent-exchange");
                }).hasSize(1);
            }

            @Test
            void multipleEventBusShouldNotThrowWhenStartAndStopContinuously() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void multipleEventBusStopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> {
                    Assertions.assertThat(exchange.getName()).isEqualTo("mailboxEvent-exchange");
                });
            }

            @Test
            void multipleEventBusStopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener) Mockito.mock(MailboxListener.class), EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(messageQueue -> {
                    Assertions.assertThat(messageQueue.getName()).contains(new CharSequence[]{EventBusTestFixture.GroupA.class.getName()});
                });
            }

            @Test
            void multipleEventBusStopShouldDeleteAllKeyRegistrationsWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).filteredOn(messageQueue -> {
                    return !messageQueue.getName().startsWith("mailboxEvent-workQueue-");
                }).isEmpty();
            }

            @Test
            void dispatchShouldStopDeliveringEventsShortlyAfterStopIsCalled() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus2.start();
                EventBusTestFixture.MailboxListenerCountingSuccessfulExecution mailboxListenerCountingSuccessfulExecution = new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register(mailboxListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus2.register(mailboxListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                ConcurrentTestRunner run = ConcurrentTestRunner.builder().operation((i, i2) -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                }).threadCount(LifeCycleTest.THREAD_COUNT).operationCount(LifeCycleTest.OPERATION_COUNT).noErrorLogs().run();
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int numberOfEventCalls = mailboxListenerCountingSuccessfulExecution.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat(mailboxListenerCountingSuccessfulExecution.numberOfEventCalls()).isCloseTo(numberOfEventCalls, Percentage.withPercentage(2.0d));
                    if (run != null) {
                        run.close();
                    }
                } catch (Throwable th) {
                    if (run != null) {
                        try {
                            run.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        @Nested
        /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$LifeCycleTest$SingleEventBus.class */
        class SingleEventBus {

            @Nested
            /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$LifeCycleTest$SingleEventBus$DispatchingWhenNetWorkIssue.class */
            class DispatchingWhenNetWorkIssue {

                @RegisterExtension
                RabbitMQExtension rabbitMQNetWorkIssueExtension = RabbitMQExtension.defaultRabbitMQ().restartPolicy(RabbitMQExtension.DockerRestartPolicy.PER_TEST);
                private RabbitMQEventBus rabbitMQEventBusWithNetWorkIssue;

                DispatchingWhenNetWorkIssue() {
                }

                @BeforeEach
                void beforeEach() {
                    this.rabbitMQEventBusWithNetWorkIssue = RabbitMQEventBusTest.this.newEventBus(this.rabbitMQNetWorkIssueExtension.getSender(), this.rabbitMQNetWorkIssueExtension.getReceiverProvider());
                }

                @Test
                void dispatchShouldWorkAfterNetworkIssuesForOldRegistration() {
                    this.rabbitMQEventBusWithNetWorkIssue.start();
                    MailboxListener newListener = EventBusTestFixture.newListener();
                    this.rabbitMQEventBusWithNetWorkIssue.register(newListener, EventBusTestFixture.GROUP_A);
                    this.rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
                    Assertions.assertThatThrownBy(() -> {
                        this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                    }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Retries exhausted");
                    this.rabbitMQNetWorkIssueExtension.getRabbitMQ().unpause();
                    this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                    RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
                }

                @Test
                void dispatchShouldWorkAfterNetworkIssuesForOldRegistrationAndKey() {
                    this.rabbitMQEventBusWithNetWorkIssue.start();
                    MailboxListener newListener = EventBusTestFixture.newListener();
                    this.rabbitMQEventBusWithNetWorkIssue.register(newListener, EventBusTestFixture.KEY_1);
                    this.rabbitMQNetWorkIssueExtension.getRabbitMQ().pause();
                    Assertions.assertThatThrownBy(() -> {
                        this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                    }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Retries exhausted");
                    this.rabbitMQNetWorkIssueExtension.getRabbitMQ().unpause();
                    this.rabbitMQEventBusWithNetWorkIssue.dispatch(EventBusTestFixture.EVENT, ImmutableSet.of(EventBusTestFixture.KEY_1)).block();
                    RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
                }
            }

            SingleEventBus() {
            }

            @Test
            void startShouldCreateEventExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> {
                    return exchange.getName().equals("mailboxEvent-exchange");
                }).hasOnlyOneElementSatisfying(exchange2 -> {
                    Assertions.assertThat(exchange2.isDurable()).isTrue();
                    Assertions.assertThat(exchange2.getType()).isEqualTo("direct");
                });
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterRestartForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterRestartForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterRestartForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception {
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
                MailboxListener newAsyncListener = EventBusTestFixture.newAsyncListener();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.register(newAsyncListener, EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBusWithKeyHandlerNotStarted.startKeyRegistrationHandler();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newAsyncListener);
            }

            @Test
            void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Retries exhausted");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void redeliverShouldWorkAfterNetworkIssuesForNewRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                }).isInstanceOf(GroupRegistrationNotFound.class);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.reDeliver(EventBusTestFixture.GROUP_A, EventBusTestFixture.EVENT).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForOldKeyRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                Mockito.when(newListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Retries exhausted");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewKeyRegistration() {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener newListener = EventBusTestFixture.newListener();
                Mockito.when(newListener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS);
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Retries exhausted");
                RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(newListener, EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                RabbitMQEventBusTest.this.assertThatListenerReceiveOneEvent(newListener);
            }

            @Test
            void stopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> {
                    Assertions.assertThat(exchange.getName()).isEqualTo("mailboxEvent-exchange");
                });
            }

            @Test
            void stopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener) Mockito.mock(MailboxListener.class), EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat(LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(messageQueue -> {
                    Assertions.assertThat(messageQueue.getName()).contains(new CharSequence[]{EventBusTestFixture.GroupA.class.getName()});
                });
            }

            @Test
            void eventBusShouldNotThrowWhenContinuouslyStartAndStop() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void dispatchShouldStopDeliveringEventsShortlyAfterStopIsCalled() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventBusTestFixture.MailboxListenerCountingSuccessfulExecution mailboxListenerCountingSuccessfulExecution = new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register(mailboxListenerCountingSuccessfulExecution, EventBusTestFixture.GROUP_A);
                ConcurrentTestRunner run = ConcurrentTestRunner.builder().operation((i, i2) -> {
                    RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.KEY_1).block();
                }).threadCount(LifeCycleTest.THREAD_COUNT).operationCount(LifeCycleTest.OPERATION_COUNT).noErrorLogs().run();
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int numberOfEventCalls = mailboxListenerCountingSuccessfulExecution.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat(mailboxListenerCountingSuccessfulExecution.numberOfEventCalls()).isCloseTo(numberOfEventCalls, Percentage.withPercentage(2.0d));
                    if (run != null) {
                        run.close();
                    }
                } catch (Throwable th) {
                    if (run != null) {
                        try {
                            run.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        LifeCycleTest() {
        }

        @BeforeEach
        void setUp() throws Exception {
            this.rabbitManagementAPI = RabbitMQEventBusTest.rabbitMQExtension.managementAPI();
        }

        @AfterEach
        void tearDown() {
            RabbitMQEventBusTest.rabbitMQExtension.getRabbitMQ().unpause();
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/mailbox/events/RabbitMQEventBusTest$PublishingTest.class */
    class PublishingTest {
        private static final String MAILBOX_WORK_QUEUE_NAME = "mailboxEvent-workQueue";

        PublishingTest() {
        }

        @BeforeEach
        void setUp() {
            Sender sender = RabbitMQEventBusTest.rabbitMQExtension.getSender();
            sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME).durable(true).exclusive(false).autoDelete(false).arguments(Constants.NO_ARGUMENTS)).block();
            sender.bind(BindingSpecification.binding().exchange("mailboxEvent-exchange").queue(MAILBOX_WORK_QUEUE_NAME).routingKey("")).block();
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQ() {
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            Assertions.assertThat(dequeueEvent()).isEqualTo(EventBusTestFixture.EVENT);
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
            RabbitMQEventBusTest.this.eventBus.dispatch(EventBusTestFixture.EVENT, EventBusTestFixture.NO_KEYS).block();
            Assertions.assertThat(dequeueEvent()).isEqualTo(EventBusTestFixture.EVENT);
        }

        private Event dequeueEvent() {
            Receiver createReceiver = RabbitMQEventBusTest.rabbitMQExtension.getReceiverProvider().createReceiver();
            try {
                Event event = (Event) RabbitMQEventBusTest.this.eventSerializer.fromJson(new String(((Delivery) createReceiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME).blockFirst()).getBody(), StandardCharsets.UTF_8)).get();
                if (createReceiver != null) {
                    createReceiver.close();
                }
                return event;
            } catch (Throwable th) {
                if (createReceiver != null) {
                    try {
                        createReceiver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    RabbitMQEventBusTest() {
    }

    public EventBusContract.EnvironmentSpeedProfile getSpeedProfile() {
        return EventBusContract.EnvironmentSpeedProfile.SLOW;
    }

    @BeforeEach
    void setUp() {
        this.memoryEventDeadLetters = new MemoryEventDeadLetters();
        TestId.Factory factory = new TestId.Factory();
        this.eventSerializer = new EventSerializer(factory, new TestMessageId.Factory(), new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer());
        this.routingKeyConverter = RoutingKeyConverter.forFactories(new RegistrationKey.Factory[]{new MailboxIdRegistrationKey.Factory(factory)});
        this.eventBus = newEventBus();
        this.eventBus2 = newEventBus();
        this.eventBus3 = newEventBus();
        this.eventBusWithKeyHandlerNotStarted = newEventBus();
        this.eventBus.start();
        this.eventBus2.start();
        this.eventBus3.start();
        this.eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler();
    }

    @AfterEach
    void tearDown() {
        this.eventBus.stop();
        this.eventBus2.stop();
        this.eventBus3.stop();
        this.eventBusWithKeyHandlerNotStarted.stop();
        EventBusTestFixture.ALL_GROUPS.stream().map(GroupRegistration.WorkQueueName::of).forEach(workQueueName -> {
            rabbitMQExtension.getSender().delete(QueueSpecification.queue(workQueueName.asString())).block();
        });
        rabbitMQExtension.getSender().delete(ExchangeSpecification.exchange("mailboxEvent-exchange")).block();
    }

    private RabbitMQEventBus newEventBus() {
        return newEventBus(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RabbitMQEventBus newEventBus(Sender sender, ReceiverProvider receiverProvider) {
        return new RabbitMQEventBus(sender, receiverProvider, this.eventSerializer, RetryBackoffConfiguration.DEFAULT, this.routingKeyConverter, this.memoryEventDeadLetters, new RecordingMetricFactory());
    }

    public EventBus eventBus() {
        return this.eventBus;
    }

    public EventBus eventBus2() {
        return this.eventBus2;
    }

    public EventDeadLetters deadLetter() {
        return this.memoryEventDeadLetters;
    }

    @Disabled("This test is failing by design as the different registration keys are handled by distinct messages")
    @Test
    public void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
    }

    @Test
    void deserializeEventCollectorGroup() throws Exception {
        Assertions.assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup")).isEqualTo(new EventCollector.EventCollectorGroup());
    }

    @Test
    void registerGroupShouldCreateRetryExchange() throws Exception {
        MailboxListener newListener = EventBusTestFixture.newListener();
        EventBusTestFixture.GroupA groupA = new EventBusTestFixture.GroupA();
        this.eventBus.register(newListener, groupA);
        GroupConsumerRetry.RetryExchangeName of = GroupConsumerRetry.RetryExchangeName.of(groupA);
        Assertions.assertThat(rabbitMQExtension.managementAPI().listExchanges()).anyMatch(exchange -> {
            return exchange.getName().equals(of.asString());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertThatListenerReceiveOneEvent(MailboxListener mailboxListener) {
        RabbitMQFixture.awaitAtMostThirtySeconds.untilAsserted(() -> {
            ((MailboxListener) Mockito.verify(mailboxListener)).event(EventBusTestFixture.EVENT);
        });
    }
}
