/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.mailbox.events;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.james.backend.rabbitmq.Constants;
import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
import org.apache.james.backend.rabbitmq.RabbitMQExtension;
import org.apache.james.backend.rabbitmq.RabbitMQManagementAPI;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.mailbox.events.ErrorHandlingContract;
import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.EventBus;
import org.apache.james.mailbox.events.EventBusConcurrentTestContract;
import org.apache.james.mailbox.events.EventBusTestFixture;
import org.apache.james.mailbox.events.EventDeadLetters;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.GroupConsumerRetry;
import org.apache.james.mailbox.events.GroupContract;
import org.apache.james.mailbox.events.GroupRegistration;
import org.apache.james.mailbox.events.KeyContract;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.events.MemoryEventDeadLetters;
import org.apache.james.mailbox.events.RabbitMQEventBus;
import org.apache.james.mailbox.events.RegistrationKey;
import org.apache.james.mailbox.events.RetryBackoffConfiguration;
import org.apache.james.mailbox.events.RoutingKeyConverter;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.TestId;
import org.apache.james.mailbox.model.TestMessageId;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.RabbitFluxException;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

class RabbitMQEventBusTest
implements GroupContract.SingleEventBusGroupContract,
GroupContract.MultipleEventBusGroupContract,
KeyContract.SingleEventBusKeyContract,
KeyContract.MultipleEventBusKeyContract,
ErrorHandlingContract {
    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
    private RabbitMQEventBus eventBus;
    private RabbitMQEventBus eventBus2;
    private RabbitMQEventBus eventBus3;
    private Sender sender;
    private RabbitMQConnectionFactory connectionFactory;
    private EventSerializer eventSerializer;
    private RoutingKeyConverter routingKeyConverter;
    private MemoryEventDeadLetters memoryEventDeadLetters;

    RabbitMQEventBusTest() {
    }

    @BeforeEach
    void setUp() {
        this.connectionFactory = rabbitMQExtension.getConnectionFactory();
        this.memoryEventDeadLetters = new MemoryEventDeadLetters();
        Mono connectionMono = Mono.fromSupplier(() -> ((RabbitMQConnectionFactory)this.connectionFactory).create()).cache();
        TestId.Factory mailboxIdFactory = new TestId.Factory();
        this.eventSerializer = new EventSerializer((MailboxId.Factory)mailboxIdFactory, (MessageId.Factory)new TestMessageId.Factory());
        this.routingKeyConverter = RoutingKeyConverter.forFactories((RegistrationKey.Factory[])new RegistrationKey.Factory[]{new MailboxIdRegistrationKey.Factory((MailboxId.Factory)mailboxIdFactory)});
        this.eventBus = this.newEventBus();
        this.eventBus2 = this.newEventBus();
        this.eventBus3 = this.newEventBus();
        this.eventBus.start();
        this.eventBus2.start();
        this.eventBus3.start();
        this.sender = RabbitFlux.createSender((SenderOptions)new SenderOptions().connectionMono(connectionMono));
    }

    @AfterEach
    void tearDown() {
        this.eventBus.stop();
        this.eventBus2.stop();
        this.eventBus3.stop();
        EventBusTestFixture.ALL_GROUPS.stream().map(GroupRegistration.WorkQueueName::of).forEach(queueName -> {
            AMQP.Queue.DeleteOk cfr_ignored_0 = (AMQP.Queue.DeleteOk)this.sender.delete(QueueSpecification.queue((String)queueName.asString())).block();
        });
        this.sender.delete(ExchangeSpecification.exchange((String)"mailboxEvent-exchange")).block();
        this.sender.close();
    }

    private RabbitMQEventBus newEventBus() {
        return new RabbitMQEventBus(this.connectionFactory, this.eventSerializer, RetryBackoffConfiguration.DEFAULT, this.routingKeyConverter, (EventDeadLetters)this.memoryEventDeadLetters, (MetricFactory)new NoopMetricFactory());
    }

    public EventBus eventBus() {
        return this.eventBus;
    }

    public EventBus eventBus2() {
        return this.eventBus2;
    }

    public EventDeadLetters deadLetter() {
        return this.memoryEventDeadLetters;
    }

    @Test
    @Disabled(value="This test is failing by design as the different registration keys are handled by distinct messages")
    public void dispatchShouldCallListenerOnceWhenSeveralKeysMatching() {
    }

    @Test
    void registerGroupShouldCreateRetryExchange() throws Exception {
        MailboxListener listener = EventBusTestFixture.newListener();
        EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
        this.eventBus.register(listener, (Group)registeredGroup);
        GroupConsumerRetry.RetryExchangeName retryExchangeName = GroupConsumerRetry.RetryExchangeName.of((Group)registeredGroup);
        Assertions.assertThat((List)rabbitMQExtension.managementAPI().listExchanges()).anyMatch(exchange -> exchange.getName().equals(retryExchangeName.asString()));
    }

    @Nested
    class LifeCycleTest {
        private final Duration TEN_SECONDS = Duration.ofSeconds(10L);
        private static final int THREAD_COUNT = 10;
        private static final int OPERATION_COUNT = 100000;
        private static final int MAX_EVENT_DISPATCHED_COUNT = 1000000;
        private RabbitMQManagementAPI rabbitManagementAPI;

        LifeCycleTest() {
        }

        @BeforeEach
        void setUp() throws Exception {
            this.rabbitManagementAPI = rabbitMQExtension.managementAPI();
        }

        @Nested
        class MultiEventBus {
            MultiEventBus() {
            }

            @Test
            void multipleEventBusStartShouldCreateOnlyOneEventExchange() {
                ((ListAssert)Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> exchange.getName().equals("mailboxEvent-exchange"))).hasSize(1);
            }

            @Test
            void multipleEventBusShouldNotThrowWhenStartAndStopContinuously() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.start();
                    RabbitMQEventBusTest.this.eventBus3.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus2.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void multipleEventBusStopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> exchange.getName().equals("mailboxEvent-exchange"));
            }

            @Test
            void multipleEventBusStopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener)Mockito.mock(MailboxListener.class), (Group)EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(queue -> queue.getName().contains(EventBusTestFixture.GroupA.class.getName()));
            }

            @Test
            void multipleEventBusStopShouldDeleteAllKeyRegistrationsWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.stop();
                RabbitMQEventBusTest.this.eventBus2.stop();
                RabbitMQEventBusTest.this.eventBus3.stop();
                ((ListAssert)Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listQueues()).filteredOn(queue -> !queue.getName().startsWith("mailboxEvent-workQueue-"))).isEmpty();
            }

            @Test
            void registrationsShouldNotHandleEventsAfterStop() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus2.start();
                EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener)listener, (Group)EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus2.register((MailboxListener)listener, (Group)EventBusTestFixture.GROUP_A);
                try (ConcurrentTestRunner closeable = ConcurrentTestRunner.builder().operation((threadNumber, step) -> RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1)).threadCount(10).operationCount(100000).run();){
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int callsAfterStop = listener.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat((int)listener.numberOfEventCalls()).isEqualTo(callsAfterStop).isLessThanOrEqualTo(1000000);
                }
            }
        }

        @Nested
        class SingleEventBus {
            SingleEventBus() {
            }

            @Test
            void startShouldCreateEventExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                ((ListAssert)Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listExchanges()).filteredOn(exchange -> exchange.getName().equals("mailboxEvent-exchange"))).hasOnlyOneElementSatisfying(exchange -> {
                    Assertions.assertThat((boolean)exchange.isDurable()).isTrue();
                    Assertions.assertThat((String)exchange.getType()).isEqualTo((Object)"direct");
                });
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(listener, (Group)EventBusTestFixture.GROUP_A);
                rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    Void cfr_ignored_0 = (Void)RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(RabbitFluxException.class);
                rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(listener, (Group)EventBusTestFixture.GROUP_A);
                rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterRestartForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(listener, (Group)EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterRestartForOldKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                RabbitMQEventBusTest.this.eventBus.register(listener, (RegistrationKey)EventBusTestFixture.KEY_1);
                rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                rabbitMQExtension.getRabbitMQ().restart();
                RabbitMQEventBusTest.this.eventBus.register(listener, (RegistrationKey)EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    Void cfr_ignored_0 = (Void)RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(RabbitFluxException.class);
                rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(listener, (Group)EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForOldKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                Mockito.when((Object)listener.getExecutionMode()).thenReturn((Object)MailboxListener.ExecutionMode.ASYNCHRONOUS);
                RabbitMQEventBusTest.this.eventBus.register(listener, (RegistrationKey)EventBusTestFixture.KEY_1);
                rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    Void cfr_ignored_0 = (Void)RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(RabbitFluxException.class);
                rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void dispatchShouldWorkAfterNetworkIssuesForNewKeyRegistration() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                MailboxListener listener = EventBusTestFixture.newListener();
                Mockito.when((Object)listener.getExecutionMode()).thenReturn((Object)MailboxListener.ExecutionMode.ASYNCHRONOUS);
                rabbitMQExtension.getRabbitMQ().pause();
                Assertions.assertThatThrownBy(() -> {
                    Void cfr_ignored_0 = (Void)RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
                }).isInstanceOf(RabbitFluxException.class);
                rabbitMQExtension.getRabbitMQ().unpause();
                RabbitMQEventBusTest.this.eventBus.register(listener, (RegistrationKey)EventBusTestFixture.KEY_1);
                RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1).block();
                ((MailboxListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.after((long)EventBusTestFixture.THIRTY_SECONDS.toMillis()).times(1))).event((Event)EventBusTestFixture.EVENT);
            }

            @Test
            void stopShouldNotDeleteEventBusExchange() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listExchanges()).anySatisfy(exchange -> exchange.getName().equals("mailboxEvent-exchange"));
            }

            @Test
            void stopShouldNotDeleteGroupRegistrationWorkQueue() {
                RabbitMQEventBusTest.this.eventBus.start();
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener)Mockito.mock(MailboxListener.class), (Group)EventBusTestFixture.GROUP_A);
                RabbitMQEventBusTest.this.eventBus.stop();
                Assertions.assertThat((List)LifeCycleTest.this.rabbitManagementAPI.listQueues()).anySatisfy(queue -> queue.getName().contains(EventBusTestFixture.GroupA.class.getName()));
            }

            @Test
            void eventBusShouldNotThrowWhenContinuouslyStartAndStop() {
                Assertions.assertThatCode(() -> {
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.start();
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus.stop();
                }).doesNotThrowAnyException();
            }

            @Test
            void registrationsShouldNotHandleEventsAfterStop() throws Exception {
                RabbitMQEventBusTest.this.eventBus.start();
                EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution();
                RabbitMQEventBusTest.this.eventBus.register((MailboxListener)listener, (Group)EventBusTestFixture.GROUP_A);
                try (ConcurrentTestRunner closeable = ConcurrentTestRunner.builder().operation((threadNumber, step) -> RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (RegistrationKey)EventBusTestFixture.KEY_1)).threadCount(10).operationCount(100000).run();){
                    TimeUnit.SECONDS.sleep(2L);
                    RabbitMQEventBusTest.this.eventBus.stop();
                    RabbitMQEventBusTest.this.eventBus2.stop();
                    int callsAfterStop = listener.numberOfEventCalls();
                    TimeUnit.SECONDS.sleep(1L);
                    Assertions.assertThat((int)listener.numberOfEventCalls()).isEqualTo(callsAfterStop).isLessThanOrEqualTo(1000000);
                }
            }
        }
    }

    @Nested
    class PublishingTest {
        private static final String MAILBOX_WORK_QUEUE_NAME = "mailboxEvent-workQueue";

        PublishingTest() {
        }

        @BeforeEach
        void setUp() {
            this.createQueue();
        }

        private void createQueue() {
            SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(() -> ((RabbitMQConnectionFactory)RabbitMQEventBusTest.this.connectionFactory).create()));
            Sender sender = RabbitFlux.createSender((SenderOptions)senderOption);
            sender.declareQueue(QueueSpecification.queue((String)MAILBOX_WORK_QUEUE_NAME).durable(true).exclusive(false).autoDelete(false).arguments((Map)Constants.NO_ARGUMENTS)).block();
            sender.bind(BindingSpecification.binding().exchange("mailboxEvent-exchange").queue(MAILBOX_WORK_QUEUE_NAME).routingKey("")).block();
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQ() {
            RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
            Assertions.assertThat((Object)this.dequeueEvent()).isEqualTo((Object)EventBusTestFixture.EVENT);
        }

        @Test
        void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
            RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS);
            Assertions.assertThat((Object)this.dequeueEvent()).isEqualTo((Object)EventBusTestFixture.EVENT);
        }

        private Event dequeueEvent() {
            RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory();
            Receiver receiver = RabbitFlux.createReceiver((ReceiverOptions)new ReceiverOptions().connectionMono(Mono.just((Object)connectionFactory.create())));
            byte[] eventInBytes = ((Delivery)receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME).blockFirst()).getBody();
            return (Event)RabbitMQEventBusTest.this.eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8)).get();
        }
    }

    @Nested
    class AtLeastOnceTest {
        AtLeastOnceTest() {
        }

        @Test
        void inProcessingEventShouldBeReDispatchedToAnotherEventBusWhenOneIsDown() {
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution eventBusListener = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution)Mockito.spy((Object)new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution eventBus2Listener = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution)Mockito.spy((Object)new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution eventBus3Listener = (EventBusTestFixture.MailboxListenerCountingSuccessfulExecution)Mockito.spy((Object)new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution());
            Answer callEventAndSleepForever = invocation -> {
                invocation.callRealMethod();
                TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                return null;
            };
            ((EventBusTestFixture.MailboxListenerCountingSuccessfulExecution)Mockito.doAnswer((Answer)callEventAndSleepForever).when((Object)eventBusListener)).event((Event)ArgumentMatchers.any());
            ((EventBusTestFixture.MailboxListenerCountingSuccessfulExecution)Mockito.doAnswer((Answer)callEventAndSleepForever).when((Object)eventBus2Listener)).event((Event)ArgumentMatchers.any());
            RabbitMQEventBusTest.this.eventBus.register((MailboxListener)eventBusListener, (Group)EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus2.register((MailboxListener)eventBus2Listener, (Group)EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus3.register((MailboxListener)eventBus3Listener, (Group)EventBusTestFixture.GROUP_A);
            RabbitMQEventBusTest.this.eventBus.dispatch((Event)EventBusTestFixture.EVENT, (Set)EventBusTestFixture.NO_KEYS).block();
            EventBusTestFixture.WAIT_CONDITION.until(() -> Assertions.assertThat((int)eventBusListener.numberOfEventCalls()).isEqualTo(1));
            RabbitMQEventBusTest.this.eventBus.stop();
            EventBusTestFixture.WAIT_CONDITION.until(() -> Assertions.assertThat((int)eventBus2Listener.numberOfEventCalls()).isEqualTo(1));
            RabbitMQEventBusTest.this.eventBus2.stop();
            EventBusTestFixture.WAIT_CONDITION.until(() -> Assertions.assertThat((int)eventBus3Listener.numberOfEventCalls()).isEqualTo(1));
        }
    }

    @Nested
    class ConcurrentTest
    implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
    EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
        ConcurrentTest() {
        }

        public EventBus eventBus3() {
            return RabbitMQEventBusTest.this.eventBus3;
        }

        public EventBus eventBus2() {
            return RabbitMQEventBusTest.this.eventBus2;
        }

        public EventBus eventBus() {
            return RabbitMQEventBusTest.this.eventBus;
        }
    }
}

