package org.apache.james.backends.rabbitmq;

import com.rabbitmq.client.Channel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.rabbitmq.ChannelPool;

/* loaded from: input_file:org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPoolTest.class */
class ReactorRabbitMQChannelPoolTest implements ChannelPoolContract {

    @RegisterExtension
    static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ().isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
    private List<ReactorRabbitMQChannelPool> channelPools;

    ReactorRabbitMQChannelPoolTest() {
    }

    @BeforeEach
    void beforeEach() {
        this.channelPools = new ArrayList();
    }

    @AfterEach
    void afterEach() {
        this.channelPools.forEach((v0) -> {
            v0.close();
        });
    }

    @Override // org.apache.james.backends.rabbitmq.ChannelPoolContract
    public ChannelPool getChannelPool(int i) {
        ReactorRabbitMQChannelPool generateChannelPool = generateChannelPool(i);
        this.channelPools.add(generateChannelPool);
        return generateChannelPool;
    }

    private ReactorRabbitMQChannelPool generateChannelPool(int i) {
        ReactorRabbitMQChannelPool reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(rabbitMQExtension.getConnectionPool().getResilientConnection(), ReactorRabbitMQChannelPool.Configuration.builder().retries(2).minBorrowDelay(Duration.ofMillis(5L)).maxChannel(i));
        reactorRabbitMQChannelPool.start();
        return reactorRabbitMQChannelPool;
    }

    @Test
    void concurrentRequestOnChannelMonoLeadToPoolWaitTimeoutException() {
        Mono channelMono = getChannelPool(99).getChannelMono();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Assertions.assertThatThrownBy(() -> {
            ConcurrentTestRunner.builder().operation((i, i2) -> {
                concurrentLinkedQueue.add((Channel) channelMono.block());
            }).threadCount(10).operationCount(10).runSuccessfullyWithin(Duration.ofSeconds(30L));
        }).isInstanceOf(ExecutionException.class).hasMessageContaining("java.util.NoSuchElementException: Timeout waiting for idle object");
    }

    @Test
    void usedChannelShouldBeClosedWhenPoolIsClosed() {
        ReactorRabbitMQChannelPool generateChannelPool = generateChannelPool(2);
        Channel channel = (Channel) generateChannelPool.getChannelMono().block();
        Assertions.assertThat(channel.isOpen()).isTrue();
        generateChannelPool.close();
        Assertions.assertThat(channel.isOpen()).isFalse();
    }

    @Test
    void notUsedChannelShouldBeClosedWhenPoolIsClosed() {
        ReactorRabbitMQChannelPool generateChannelPool = generateChannelPool(2);
        Channel channel = (Channel) generateChannelPool.getChannelMono().block();
        Assertions.assertThat(channel.isOpen()).isTrue();
        generateChannelPool.getChannelCloseHandler().accept(SignalType.ON_NEXT, channel);
        generateChannelPool.close();
        Assertions.assertThat(channel.isOpen()).isFalse();
    }

    @Test
    void channelBorrowShouldNotThrowWhenClosedChannel() throws Exception {
        ReactorRabbitMQChannelPool generateChannelPool = generateChannelPool(1);
        Channel channel = (Channel) generateChannelPool.getChannelMono().block();
        returnToThePool(generateChannelPool, channel);
        channel.close();
        Assertions.assertThat(((Channel) generateChannelPool.getChannelMono().block()).isOpen()).isTrue();
    }
}
