package org.apache.james.backends.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.time.Duration;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
import javax.annotation.PreDestroy;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

/* loaded from: input_file:org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.class */
public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
    private static final int MAX_CHANNELS_NUMBER = 3;
    private static final int MAX_BORROW_RETRIES = 3;
    private final Mono<Connection> connectionMono;
    private final GenericObjectPool<Channel> pool;
    private final ConcurrentSkipListSet<Channel> borrowedChannels;
    private Sender sender;
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorRabbitMQChannelPool.class);
    private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
    private static final Duration MIN_BORROW_DELAY = Duration.ofMillis(50);
    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);

    /* loaded from: input_file:org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool$ChannelClosedException.class */
    private static class ChannelClosedException extends IOException {
        ChannelClosedException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool$ChannelFactory.class */
    static class ChannelFactory extends BasePooledObjectFactory<Channel> {
        private static final int MAX_RETRIES = 5;
        private final Mono<Connection> connectionMono;
        private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
        private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);

        ChannelFactory(Mono<Connection> mono) {
            this.connectionMono = mono;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public Channel m6create() {
            return (Channel) this.connectionMono.flatMap(this::openChannel).block();
        }

        private Mono<Channel> openChannel(Connection connection) {
            Objects.requireNonNull(connection);
            return Mono.fromCallable(connection::openChannel).map(optional -> {
                return (Channel) optional.orElseThrow(() -> {
                    return new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels");
                });
            }).retryBackoff(5L, RETRY_FIRST_BACK_OFF, ReactorRabbitMQChannelPool.FOREVER, Schedulers.elastic()).doOnError(th -> {
                LOGGER.error("error when creating new channel", th);
            });
        }

        public PooledObject<Channel> wrap(Channel channel) {
            return new DefaultPooledObject(channel);
        }

        public void destroyObject(PooledObject<Channel> pooledObject) throws Exception {
            Channel channel = (Channel) pooledObject.getObject();
            if (channel.isOpen()) {
                channel.close();
            }
        }
    }

    public ReactorRabbitMQChannelPool(SimpleConnectionPool simpleConnectionPool) {
        this(simpleConnectionPool.getResilientConnection(), 3);
    }

    public ReactorRabbitMQChannelPool(Mono<Connection> mono, int i) {
        this.connectionMono = mono;
        ChannelFactory channelFactory = new ChannelFactory(mono);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(i);
        this.pool = new GenericObjectPool<>(channelFactory, genericObjectPoolConfig);
        this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt((v0) -> {
            return System.identityHashCode(v0);
        }));
    }

    public void start() {
        this.sender = createSender();
    }

    public Sender getSender() {
        return this.sender;
    }

    public Receiver createReceiver() {
        return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(this.connectionMono));
    }

    public Mono<Connection> getConnectionMono() {
        return this.connectionMono;
    }

    public Mono<? extends Channel> getChannelMono() {
        return Mono.fromCallable(this::borrow);
    }

    private Channel borrow() {
        Channel tryBorrowFromPool = tryBorrowFromPool();
        this.borrowedChannels.add(tryBorrowFromPool);
        return tryBorrowFromPool;
    }

    private Channel tryBorrowFromPool() {
        return (Channel) Mono.fromCallable(this::borrowFromPool).doOnError(th -> {
            LOGGER.warn("Cannot borrow channel", th);
        }).retryBackoff(3L, MIN_BORROW_DELAY, FOREVER, Schedulers.elastic()).onErrorMap(this::propagateException).subscribeOn(Schedulers.elastic()).block();
    }

    private Throwable propagateException(Throwable th) {
        return ((th instanceof IllegalStateException) && th.getMessage().contains("Retries exhausted")) ? th.getCause() : th;
    }

    private Channel borrowFromPool() throws Exception {
        Channel channel = (Channel) this.pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
        if (channel.isOpen()) {
            return channel;
        }
        invalidateObject(channel);
        throw new ChannelClosedException("borrowed channel is already closed");
    }

    public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
        return (signalType, channel) -> {
            this.borrowedChannels.remove(channel);
            if (channel.isOpen() && signalType == SignalType.ON_COMPLETE) {
                this.pool.returnObject(channel);
            } else {
                invalidateObject(channel);
            }
        };
    }

    private Sender createSender() {
        return RabbitFlux.createSender(new SenderOptions().connectionMono(this.connectionMono).channelPool(this).resourceManagementChannelMono(this.connectionMono.map(Throwing.function((v0) -> {
            return v0.createChannel();
        })).cache()));
    }

    private void invalidateObject(Channel channel) {
        try {
            this.pool.invalidateObject(channel);
            if (channel.isOpen()) {
                channel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @PreDestroy
    public void close() {
        this.sender.close();
        this.borrowedChannels.forEach(channel -> {
            getChannelCloseHandler().accept(SignalType.ON_NEXT, channel);
        });
        this.borrowedChannels.clear();
        this.pool.close();
    }

    public boolean tryChannel() {
        Channel channel = null;
        try {
            channel = borrow();
            boolean isOpen = channel.isOpen();
            if (channel != null) {
                this.borrowedChannels.remove(channel);
                this.pool.returnObject(channel);
            }
            return isOpen;
        } catch (Throwable th) {
            if (channel != null) {
                this.borrowedChannels.remove(channel);
                this.pool.returnObject(channel);
            }
            throw th;
        }
    }
}
