package org.apache.james.backend.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.time.Duration;
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

/* loaded from: input_file:org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool.class */
public class ReactorRabbitMQChannelPool implements ChannelPool {
    private static final long MAXIMUM_BORROW_TIMEOUT_IN_MS = Duration.ofSeconds(5).toMillis();
    private final Mono<Connection> connectionMono;
    private final GenericObjectPool<Channel> pool;
    private final ConcurrentSkipListSet<Channel> borrowedChannels;

    /* loaded from: input_file:org/apache/james/backend/rabbitmq/ReactorRabbitMQChannelPool$ChannelFactory.class */
    static class ChannelFactory extends BasePooledObjectFactory<Channel> {
        private static final int MAX_RETRIES = 5;
        private final Mono<Connection> connectionMono;
        private static final Logger LOGGER = LoggerFactory.getLogger(ChannelFactory.class);
        private static final Duration RETRY_FIRST_BACK_OFF = Duration.ofMillis(100);
        private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);

        ChannelFactory(Mono<Connection> mono) {
            this.connectionMono = mono;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public Channel m5create() throws Exception {
            return (Channel) this.connectionMono.flatMap(this::openChannel).block();
        }

        private Mono<Channel> openChannel(Connection connection) {
            Objects.requireNonNull(connection);
            return Mono.fromCallable(connection::openChannel).map(optional -> {
                return (Channel) optional.orElseThrow(() -> {
                    return new RuntimeException("RabbitMQ reached to maximum opened channels, cannot get more channels");
                });
            }).retryBackoff(5L, RETRY_FIRST_BACK_OFF, FOREVER, Schedulers.elastic()).doOnError(th -> {
                LOGGER.error("error when creating new channel", th);
            });
        }

        public PooledObject<Channel> wrap(Channel channel) {
            return new DefaultPooledObject(channel);
        }

        public void destroyObject(PooledObject<Channel> pooledObject) throws Exception {
            Channel channel = (Channel) pooledObject.getObject();
            if (channel.isOpen()) {
                channel.close();
            }
        }
    }

    public ReactorRabbitMQChannelPool(Mono<Connection> mono, int i) {
        this.connectionMono = mono;
        ChannelFactory channelFactory = new ChannelFactory(mono);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(i);
        this.pool = new GenericObjectPool<>(channelFactory, genericObjectPoolConfig);
        this.borrowedChannels = new ConcurrentSkipListSet<>(Comparator.comparingInt((v0) -> {
            return System.identityHashCode(v0);
        }));
    }

    public Mono<? extends Channel> getChannelMono() {
        return Mono.fromCallable(() -> {
            Channel channel = (Channel) this.pool.borrowObject(MAXIMUM_BORROW_TIMEOUT_IN_MS);
            this.borrowedChannels.add(channel);
            return channel;
        });
    }

    public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
        return (signalType, channel) -> {
            this.borrowedChannels.remove(channel);
            if (channel.isOpen() && signalType == SignalType.ON_COMPLETE) {
                this.pool.returnObject(channel);
            } else {
                invalidateObject(channel);
            }
        };
    }

    public Sender createSender() {
        return RabbitFlux.createSender(new SenderOptions().connectionMono(this.connectionMono).channelPool(this).resourceManagementChannelMono(this.connectionMono.map(Throwing.function((v0) -> {
            return v0.createChannel();
        })).cache()));
    }

    private void invalidateObject(Channel channel) {
        try {
            this.pool.invalidateObject(channel);
            if (channel.isOpen()) {
                channel.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this.borrowedChannels.forEach(channel -> {
            getChannelCloseHandler().accept(SignalType.ON_NEXT, channel);
        });
        this.borrowedChannels.clear();
        this.pool.close();
    }
}
