package org.apache.james.backends.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest.class */
public class RabbitMQTest {
    public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
    public static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);

    @Nested
    /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$ConcurrencyTest.class */
    class ConcurrencyTest {
        private static final String QUEUE_NAME_1 = "TEST1";
        private static final String EXCHANGE_NAME_1 = "EXCHANGE1";

        ConcurrencyTest() {
        }

        @BeforeEach
        void setup() {
            Sender sender = RabbitMQTest.rabbitMQExtension.getSender();
            Flux.concat(new Publisher[]{sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME_1).durable(true).type("direct")), sender.declareQueue(QueueSpecification.queue(QUEUE_NAME_1).durable(true).exclusive(false).autoDelete(false)), sender.bind(BindingSpecification.binding().exchange(EXCHANGE_NAME_1).queue(QUEUE_NAME_1).routingKey(""))}).then().block();
            IntStream.rangeClosed(1, 5).forEach(i -> {
                sender.send(Mono.just(new OutboundMessage(EXCHANGE_NAME_1, "", String.format("Message + %s", UUID.randomUUID()).getBytes(StandardCharsets.UTF_8)))).block();
            });
        }

        @Test
        void consumingShouldSuccessWhenAckConcurrent() throws Exception {
            ReceiverProvider receiverProvider = RabbitMQTest.rabbitMQExtension.getReceiverProvider();
            CountDownLatch countDownLatch = new CountDownLatch(5);
            Objects.requireNonNull(receiverProvider);
            Flux.using(receiverProvider::createReceiver, receiver -> {
                return receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions());
            }, (v0) -> {
                v0.close();
            }).filter(acknowledgableDelivery -> {
                return acknowledgableDelivery.getBody() != null;
            }).concatMap(acknowledgableDelivery2 -> {
                return Mono.fromCallable(() -> {
                    acknowledgableDelivery2.ack(true);
                    countDownLatch.countDown();
                    return acknowledgableDelivery2;
                }).subscribeOn(Schedulers.fromExecutor(RabbitMQTest.EXECUTOR));
            }).subscribe();
            Assertions.assertThat(countDownLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        }

        @Disabled("Now, it fail, Because using Flux.take and concatMap")
        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTake() throws Exception {
            ReceiverProvider receiverProvider = RabbitMQTest.rabbitMQExtension.getReceiverProvider();
            CountDownLatch countDownLatch = new CountDownLatch(5);
            Objects.requireNonNull(receiverProvider);
            Flux.using(receiverProvider::createReceiver, receiver -> {
                return receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions());
            }, (v0) -> {
                v0.close();
            }).filter(acknowledgableDelivery -> {
                return acknowledgableDelivery.getBody() != null;
            }).take(5).concatMap(acknowledgableDelivery2 -> {
                return Mono.fromCallable(() -> {
                    acknowledgableDelivery2.ack(true);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                    return acknowledgableDelivery2;
                }).subscribeOn(Schedulers.fromExecutor(RabbitMQTest.EXECUTOR));
            }).subscribe();
            Assertions.assertThat(countDownLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        }

        @Disabled("Now, sometimes pass, sometimes fail. Because using Flux.take and flatMap, It can be re-produce it by try 'Repeat until failure' of Intellij")
        @Test
        void consumingShouldSuccessWhenAckConcurrentWithFluxTakeAndFlatMap() throws Exception {
            ReceiverProvider receiverProvider = RabbitMQTest.rabbitMQExtension.getReceiverProvider();
            CountDownLatch countDownLatch = new CountDownLatch(5);
            Objects.requireNonNull(receiverProvider);
            Flux.using(receiverProvider::createReceiver, receiver -> {
                return receiver.consumeManualAck(QUEUE_NAME_1, new ConsumeOptions());
            }, (v0) -> {
                v0.close();
            }).filter(acknowledgableDelivery -> {
                return acknowledgableDelivery.getBody() != null;
            }).take(5).flatMap(acknowledgableDelivery2 -> {
                return Mono.fromCallable(() -> {
                    acknowledgableDelivery2.ack(true);
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + ": " + countDownLatch.getCount());
                    return acknowledgableDelivery2;
                }).subscribeOn(Schedulers.fromExecutor(RabbitMQTest.EXECUTOR));
            }).subscribe();
            Assertions.assertThat(countDownLatch.await(10L, TimeUnit.SECONDS)).isTrue();
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$FourConnections.class */
    class FourConnections {
        private ConnectionFactory connectionFactory1;
        private ConnectionFactory connectionFactory2;
        private ConnectionFactory connectionFactory3;
        private ConnectionFactory connectionFactory4;
        private Connection connection1;
        private Connection connection2;
        private Connection connection3;
        private Connection connection4;
        private Channel channel1;
        private Channel channel2;
        private Channel channel3;
        private Channel channel4;

        @Nested
        /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$FourConnections$BroadCast.class */
        class BroadCast {
            BroadCast() {
            }

            @Test
            void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                String queue = FourConnections.this.channel2.queueDeclare().getQueue();
                FourConnections.this.channel2.queueBind(queue, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                String queue2 = FourConnections.this.channel3.queueDeclare().getQueue();
                FourConnections.this.channel3.queueBind(queue2, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                String queue3 = FourConnections.this.channel4.queueDeclare().getQueue();
                FourConnections.this.channel4.queueBind(queue3, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(FourConnections.this.channel2);
                InMemoryConsumer inMemoryConsumer2 = new InMemoryConsumer(FourConnections.this.channel3);
                InMemoryConsumer inMemoryConsumer3 = new InMemoryConsumer(FourConnections.this.channel4);
                FourConnections.this.channel2.basicConsume(queue, inMemoryConsumer);
                FourConnections.this.channel3.basicConsume(queue2, inMemoryConsumer2);
                FourConnections.this.channel4.basicConsume(queue3, inMemoryConsumer3);
                Stream mapToObj = IntStream.range(0, 10).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(FourConnections.this.countReceivedMessages(inMemoryConsumer, inMemoryConsumer2, inMemoryConsumer3) == 30);
                });
                ImmutableList immutableList = (ImmutableList) IntStream.range(0, 10).boxed().collect(ImmutableList.toImmutableList());
                Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnlyElementsOf(immutableList);
                Assertions.assertThat(inMemoryConsumer2.getConsumedMessages()).containsOnlyElementsOf(immutableList);
                Assertions.assertThat(inMemoryConsumer3.getConsumedMessages()).containsOnlyElementsOf(immutableList);
            }
        }

        @Nested
        /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$FourConnections$Routing.class */
        class Routing {
            Routing() {
            }

            @Test
            void rabbitMQShouldSupportRouting() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                String queue = FourConnections.this.channel1.queueDeclare().getQueue();
                FourConnections.this.channel1.queueBind(queue, RabbitMQFixture.EXCHANGE_NAME, "c1");
                FourConnections.this.channel1.queueBind(queue, RabbitMQFixture.EXCHANGE_NAME, "c2");
                String queue2 = FourConnections.this.channel2.queueDeclare().getQueue();
                FourConnections.this.channel2.queueBind(queue2, RabbitMQFixture.EXCHANGE_NAME, "c2");
                FourConnections.this.channel2.queueBind(queue2, RabbitMQFixture.EXCHANGE_NAME, "c3");
                String queue3 = FourConnections.this.channel3.queueDeclare().getQueue();
                FourConnections.this.channel3.queueBind(queue3, RabbitMQFixture.EXCHANGE_NAME, "c3");
                FourConnections.this.channel3.queueBind(queue3, RabbitMQFixture.EXCHANGE_NAME, "c4");
                String queue4 = FourConnections.this.channel4.queueDeclare().getQueue();
                FourConnections.this.channel4.queueBind(queue4, RabbitMQFixture.EXCHANGE_NAME, "c1");
                FourConnections.this.channel4.queueBind(queue4, RabbitMQFixture.EXCHANGE_NAME, "c4");
                FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, "c1", Constants.NO_PROPERTIES, RabbitMQTest.this.asBytes("1"));
                FourConnections.this.channel2.basicPublish(RabbitMQFixture.EXCHANGE_NAME, "c2", Constants.NO_PROPERTIES, RabbitMQTest.this.asBytes("2"));
                FourConnections.this.channel3.basicPublish(RabbitMQFixture.EXCHANGE_NAME, "c3", Constants.NO_PROPERTIES, RabbitMQTest.this.asBytes("3"));
                FourConnections.this.channel4.basicPublish(RabbitMQFixture.EXCHANGE_NAME, "c4", Constants.NO_PROPERTIES, RabbitMQTest.this.asBytes("4"));
                InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(FourConnections.this.channel1);
                InMemoryConsumer inMemoryConsumer2 = new InMemoryConsumer(FourConnections.this.channel2);
                InMemoryConsumer inMemoryConsumer3 = new InMemoryConsumer(FourConnections.this.channel3);
                InMemoryConsumer inMemoryConsumer4 = new InMemoryConsumer(FourConnections.this.channel4);
                FourConnections.this.channel1.basicConsume(queue, inMemoryConsumer);
                FourConnections.this.channel2.basicConsume(queue2, inMemoryConsumer2);
                FourConnections.this.channel3.basicConsume(queue3, inMemoryConsumer3);
                FourConnections.this.channel4.basicConsume(queue4, inMemoryConsumer4);
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(FourConnections.this.countReceivedMessages(inMemoryConsumer, inMemoryConsumer2, inMemoryConsumer3, inMemoryConsumer4) == 8);
                });
                Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).containsOnly(new Integer[]{1, 2});
                Assertions.assertThat(inMemoryConsumer2.getConsumedMessages()).containsOnly(new Integer[]{2, 3});
                Assertions.assertThat(inMemoryConsumer3.getConsumedMessages()).containsOnly(new Integer[]{3, 4});
                Assertions.assertThat(inMemoryConsumer4.getConsumedMessages()).containsOnly(new Integer[]{1, 4});
            }
        }

        @Nested
        /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$FourConnections$WorkQueue.class */
        class WorkQueue {
            WorkQueue() {
            }

            @Test
            void rabbitMQShouldSupportTheWorkQueueCase() throws Exception {
                int i = 100;
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, true, ImmutableMap.of());
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Stream mapToObj = IntStream.range(0, 100).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(FourConnections.this.channel2);
                InMemoryConsumer inMemoryConsumer2 = new InMemoryConsumer(FourConnections.this.channel3);
                InMemoryConsumer inMemoryConsumer3 = new InMemoryConsumer(FourConnections.this.channel4);
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, inMemoryConsumer);
                FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, inMemoryConsumer2);
                FourConnections.this.channel4.basicConsume(RabbitMQFixture.WORK_QUEUE, inMemoryConsumer3);
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(FourConnections.this.countReceivedMessages(inMemoryConsumer, inMemoryConsumer2, inMemoryConsumer3) == ((long) i));
                });
                Assertions.assertThat(Iterables.concat(inMemoryConsumer.getConsumedMessages(), inMemoryConsumer2.getConsumedMessages(), inMemoryConsumer3.getConsumedMessages())).containsOnlyElementsOf((ImmutableList) IntStream.range(0, 100).boxed().collect(ImmutableList.toImmutableList()));
            }

            @Test
            void rabbitMQShouldRejectSecondConsumerInExclusiveWorkQueueCase() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, ImmutableMap.of());
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Stream mapToObj = IntStream.range(0, 10).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                new ConcurrentLinkedQueue();
                ImmutableMap of = ImmutableMap.of();
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "dyingConsumer", false, true, of, (str2, delivery) -> {
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                }, str3 -> {
                });
                Assertions.assertThatThrownBy(() -> {
                    FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "fallbackConsumer", false, true, of, (str4, delivery2) -> {
                    }, str5 -> {
                    });
                }).isInstanceOf(IOException.class).hasStackTraceContaining("ACCESS_REFUSED");
            }

            @Test
            void rabbitMQShouldSupportTheExclusiveWorkQueueCase() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, ImmutableMap.of());
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Stream mapToObj = IntStream.range(0, 10).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                ImmutableMap of = ImmutableMap.of();
                ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, false, "dyingConsumer", false, true, of, (str2, delivery) -> {
                    if (concurrentLinkedQueue.size() != 0) {
                        FourConnections.this.channel2.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                    } else {
                        concurrentLinkedQueue.add(Integer.valueOf(new String(delivery.getBody(), StandardCharsets.UTF_8)));
                        FourConnections.this.channel2.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                }, str3 -> {
                });
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(concurrentLinkedQueue.size() == 1);
                });
                FourConnections.this.channel2.basicCancel("dyingConsumer");
                InMemoryConsumer inMemoryConsumer = new InMemoryConsumer(FourConnections.this.channel3);
                FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "fallbackConsumer", false, true, of, inMemoryConsumer);
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(FourConnections.this.countReceivedMessages(inMemoryConsumer) >= 1);
                });
                Assertions.assertThat(concurrentLinkedQueue).containsExactly(new Integer[]{0});
                Assertions.assertThat(inMemoryConsumer.getConsumedMessages()).contains(new Integer[]{1, 2}).doesNotContain(new Integer[]{0});
            }

            @Test
            void rabbitMQShouldDeliverMessageToSingleActiveConsumer() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Stream mapToObj = IntStream.range(0, 10).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                FourConnections.this.channel2.basicQos(1);
                FourConnections.this.channel3.basicQos(1);
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                ImmutableMap of = ImmutableMap.of();
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, false, "firstRegisteredConsumer", false, false, of, (str2, delivery) -> {
                    incrementCountForConsumerAndAckMessage(atomicInteger, delivery, FourConnections.this.channel2);
                }, str3 -> {
                });
                FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, false, "starvingConsumer", false, false, of, (str4, delivery2) -> {
                    incrementCountForConsumerAndAckMessage(atomicInteger2, delivery2, FourConnections.this.channel3);
                }, str5 -> {
                });
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(atomicInteger.get() + atomicInteger2.get() == 10);
                });
                Assertions.assertThat(atomicInteger.get()).isEqualTo(10);
                Assertions.assertThat(atomicInteger2.get()).isEqualTo(0);
            }

            private void incrementCountForConsumerAndAckMessage(AtomicInteger atomicInteger, Delivery delivery, Channel channel) throws IOException {
                try {
                    atomicInteger.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1L);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (InterruptedException e) {
                }
            }

            @Test
            void rabbitMQShouldProvideSingleActiveConsumerName() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, "foo".getBytes(StandardCharsets.UTF_8));
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ImmutableMap of = ImmutableMap.of();
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "firstRegisteredConsumer", false, false, of, (str, delivery) -> {
                    atomicInteger.incrementAndGet();
                }, str2 -> {
                });
                FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "starvingConsumer", false, false, of, (str3, delivery2) -> {
                    atomicInteger.incrementAndGet();
                }, str4 -> {
                });
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(atomicInteger.get() > 0);
                });
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(!RabbitMQTest.rabbitMQExtension.managementAPI().queueDetails("/", RabbitMQFixture.WORK_QUEUE).consumerDetails.isEmpty());
                });
                Assertions.assertThat((List) RabbitMQTest.rabbitMQExtension.managementAPI().queueDetails("/", RabbitMQFixture.WORK_QUEUE).consumerDetails.stream().filter(consumerDetails -> {
                    return consumerDetails.status == RabbitMQManagementAPI.ActivityStatus.SingleActive;
                }).map((v0) -> {
                    return v0.getTag();
                }).collect(Collectors.toList())).hasSize(1).first().isEqualTo("firstRegisteredConsumer");
            }

            @Test
            void bindingSourceShouldMatchBeanContract() {
                EqualsVerifier.forClass(RabbitMQManagementAPI.BindingSource.class).verify();
            }

            @Test
            void listBindingsShouldReturnEmptyWhenNone() throws Exception {
                Assertions.assertThat(RabbitMQTest.rabbitMQExtension.managementAPI().listBindings("/", RabbitMQFixture.EXCHANGE_NAME).stream().map((v0) -> {
                    return v0.getDestination();
                })).isEmpty();
            }

            @Test
            void listBindingsShouldAllowRetrievingDestination() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Assertions.assertThat(RabbitMQTest.rabbitMQExtension.managementAPI().listBindings("/", RabbitMQFixture.EXCHANGE_NAME).stream().map((v0) -> {
                    return v0.getDestination();
                })).containsExactly(new String[]{RabbitMQFixture.WORK_QUEUE});
            }

            @Test
            void listBindingsShouldAllowRetrievingDestinations() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE_2, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE_2, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Assertions.assertThat(RabbitMQTest.rabbitMQExtension.managementAPI().listBindings("/", RabbitMQFixture.EXCHANGE_NAME).stream().map((v0) -> {
                    return v0.getDestination();
                })).containsExactly(new String[]{RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.WORK_QUEUE_2});
            }

            @Test
            void rabbitMQShouldDeliverMessageToFallbackSingleActiveConsumer() throws Exception {
                FourConnections.this.channel1.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
                FourConnections.this.channel1.queueDeclare(RabbitMQFixture.WORK_QUEUE, true, false, false, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
                FourConnections.this.channel1.queueBind(RabbitMQFixture.WORK_QUEUE, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
                Stream mapToObj = IntStream.range(0, 10).mapToObj(String::valueOf);
                RabbitMQTest rabbitMQTest = RabbitMQTest.this;
                mapToObj.map(str -> {
                    return rabbitMQTest.asBytes(str);
                }).forEach(Throwing.consumer(bArr -> {
                    FourConnections.this.channel1.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, Constants.NO_PROPERTIES, bArr);
                }).sneakyThrow());
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                ImmutableMap of = ImmutableMap.of();
                FourConnections.this.channel2.basicConsume(RabbitMQFixture.WORK_QUEUE, false, "firstRegisteredConsumer", false, false, of, (str2, delivery) -> {
                    try {
                        if (atomicInteger.get() < 5) {
                            FourConnections.this.channel2.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                            atomicInteger.incrementAndGet();
                        } else {
                            FourConnections.this.channel2.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                        }
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                }, str3 -> {
                });
                FourConnections.this.channel3.basicConsume(RabbitMQFixture.WORK_QUEUE, true, "fallbackConsumer", false, false, of, (str4, delivery2) -> {
                    atomicInteger2.incrementAndGet();
                }, str5 -> {
                });
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(atomicInteger.get() == 5);
                });
                FourConnections.this.channel2.basicCancel("firstRegisteredConsumer");
                RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                    return Boolean.valueOf(atomicInteger.get() + atomicInteger2.get() == 10);
                });
                Assertions.assertThat(atomicInteger.get()).isEqualTo(5);
                Assertions.assertThat(atomicInteger2.get()).isEqualTo(5);
            }
        }

        FourConnections() {
        }

        @BeforeEach
        void setup(DockerRabbitMQ dockerRabbitMQ) throws IOException, TimeoutException {
            this.connectionFactory1 = dockerRabbitMQ.connectionFactory();
            this.connectionFactory2 = dockerRabbitMQ.connectionFactory();
            this.connectionFactory3 = dockerRabbitMQ.connectionFactory();
            this.connectionFactory4 = dockerRabbitMQ.connectionFactory();
            this.connection1 = this.connectionFactory1.newConnection();
            this.connection2 = this.connectionFactory2.newConnection();
            this.connection3 = this.connectionFactory3.newConnection();
            this.connection4 = this.connectionFactory4.newConnection();
            this.channel1 = this.connection1.createChannel();
            this.channel2 = this.connection2.createChannel();
            this.channel3 = this.connection3.createChannel();
            this.channel4 = this.connection4.createChannel();
        }

        @AfterEach
        void tearDown() {
            RabbitMQTest.this.closeQuietly(this.channel1, this.channel2, this.channel3, this.channel4, this.connection1, this.connection2, this.connection3, this.connection4);
        }

        private long countReceivedMessages(InMemoryConsumer... inMemoryConsumerArr) {
            return Arrays.stream(inMemoryConsumerArr).map((v0) -> {
                return v0.getConsumedMessages();
            }).mapToLong((v0) -> {
                return v0.size();
            }).sum();
        }
    }

    @Nested
    /* loaded from: input_file:org/apache/james/backends/rabbitmq/RabbitMQTest$SingleConsumerTest.class */
    class SingleConsumerTest {
        private ConnectionFactory connectionFactory;
        private Connection connection;
        private Channel channel;

        SingleConsumerTest() {
        }

        @BeforeEach
        void setup(DockerRabbitMQ dockerRabbitMQ) throws IOException, TimeoutException {
            this.connectionFactory = dockerRabbitMQ.connectionFactory();
            this.connectionFactory.setNetworkRecoveryInterval(1000);
            this.connection = this.connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
        }

        @AfterEach
        void tearDown(DockerRabbitMQ dockerRabbitMQ) throws Exception {
            RabbitMQTest.this.closeQuietly(this.connection, this.channel);
            dockerRabbitMQ.reset();
        }

        @Test
        void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception {
            String createQueue = createQueue(this.channel);
            publishAMessage(this.channel);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return messageReceived(this.channel, createQueue);
            });
        }

        @Test
        void getQueueLengthShouldReturnEmptyWhenEmptyQueue() throws Exception {
            String createQueue = createQueue(this.channel);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(RabbitMQTest.rabbitMQExtension.managementAPI().queueDetails("/", createQueue).getQueueLength() == 0);
            });
        }

        @Test
        void getQueueLengthShouldReturnExactlyNumberOfMessagesInQueue() throws Exception {
            String createQueue = createQueue(this.channel);
            publishAMessage(this.channel);
            publishAMessage(this.channel);
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return Boolean.valueOf(RabbitMQTest.rabbitMQExtension.managementAPI().queueDetails("/", createQueue).getQueueLength() == 2);
            });
        }

        @Test
        void demonstrateDurability(DockerRabbitMQ dockerRabbitMQ) throws Exception {
            String createQueue = createQueue(this.channel);
            publishAMessage(this.channel);
            Thread.sleep(200L);
            dockerRabbitMQ.restart();
            RabbitMQFixture.awaitAtMostOneMinute.until(() -> {
                return containerIsRestarted(dockerRabbitMQ);
            });
            Thread.sleep(this.connectionFactory.getNetworkRecoveryInterval());
            Assertions.assertThat(this.channel.basicGet(createQueue, false)).isNotNull();
        }

        private Boolean containerIsRestarted(DockerRabbitMQ dockerRabbitMQ) {
            try {
                dockerRabbitMQ.connectionFactory().newConnection();
                return true;
            } catch (Exception e) {
                return false;
            }
        }

        private String createQueue(Channel channel) throws IOException {
            channel.exchangeDeclare(RabbitMQFixture.EXCHANGE_NAME, "direct", true);
            String uuid = UUID.randomUUID().toString();
            channel.queueDeclare(uuid, true, false, true, RabbitMQTest.NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
            channel.queueBind(uuid, RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY);
            return uuid;
        }

        private void publishAMessage(Channel channel) throws IOException {
            channel.basicPublish(RabbitMQFixture.EXCHANGE_NAME, RabbitMQFixture.ROUTING_KEY, new AMQP.BasicProperties.Builder().deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()).priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()).contentType(MessageProperties.PERSISTENT_TEXT_PLAIN.getContentType()).build(), RabbitMQTest.this.asBytes("Hello, world!"));
        }

        private Boolean messageReceived(Channel channel, String str) {
            try {
                return Boolean.valueOf(channel.basicGet(str, false) != null);
            } catch (Exception e) {
                return false;
            }
        }
    }

    RabbitMQTest() {
    }

    private void closeQuietly(AutoCloseable... autoCloseableArr) {
        Arrays.stream(autoCloseableArr).forEach(this::closeQuietly);
    }

    private void closeQuietly(AutoCloseable autoCloseable) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] asBytes(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }
}
