package org.apache.james.backend.rabbitmq;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.james.backend.rabbitmq.RabbitMQChannelPool;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

/* loaded from: input_file:org/apache/james/backend/rabbitmq/SimpleChannelPool.class */
public class SimpleChannelPool implements RabbitMQChannelPool {
    private final AtomicReference<Channel> channelReference = new AtomicReference<>();
    private final Receiver rabbitFlux;
    private final SimpleConnectionPool connectionPool;

    @Inject
    @VisibleForTesting
    SimpleChannelPool(SimpleConnectionPool simpleConnectionPool) {
        this.connectionPool = simpleConnectionPool;
        this.rabbitFlux = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(simpleConnectionPool.getResilientConnection()));
    }

    @Override // org.apache.james.backend.rabbitmq.RabbitMQChannelPool
    public Flux<AcknowledgableDelivery> receive(String str) {
        return this.rabbitFlux.consumeManualAck(str);
    }

    @Override // org.apache.james.backend.rabbitmq.RabbitMQChannelPool
    public <T, E extends Throwable> T execute(RabbitMQChannelPool.RabbitFunction<T, E> rabbitFunction) throws Throwable, RabbitMQChannelPool.ConnectionFailedException {
        return rabbitFunction.execute((Channel) getResilientChannel().block());
    }

    @Override // org.apache.james.backend.rabbitmq.RabbitMQChannelPool
    public <E extends Throwable> void execute(RabbitMQChannelPool.RabbitConsumer<E> rabbitConsumer) throws Throwable, RabbitMQChannelPool.ConnectionFailedException {
        rabbitConsumer.execute((Channel) getResilientChannel().block());
    }

    @Override // org.apache.james.backend.rabbitmq.RabbitMQChannelPool
    @PreDestroy
    public void close() {
        Optional.ofNullable(this.channelReference.get()).filter((v0) -> {
            return v0.isOpen();
        }).ifPresent(Throwing.consumer((v0) -> {
            v0.close();
        }).orDoNothing());
        try {
            this.rabbitFlux.close();
        } catch (Throwable th) {
        }
    }

    private Mono<Channel> getResilientChannel() {
        return Mono.defer(this::getOpenChannel).retryBackoff(100, Duration.ofMillis(100L), Duration.ofMillis(Long.MAX_VALUE), Schedulers.elastic());
    }

    private Mono<Channel> getOpenChannel() {
        Channel channel = this.channelReference.get();
        return Mono.justOrEmpty(channel).publishOn(Schedulers.elastic()).filter((v0) -> {
            return v0.isOpen();
        }).switchIfEmpty(this.connectionPool.getResilientConnection().flatMap(connection -> {
            Objects.requireNonNull(connection);
            return Mono.fromCallable(connection::createChannel);
        })).flatMap(channel2 -> {
            return replaceCurrentChannel(channel, channel2);
        }).onErrorMap(th -> {
            return new RuntimeException("unable to create and register a new Channel", th);
        });
    }

    private Mono<Channel> replaceCurrentChannel(Channel channel, Channel channel2) {
        if (this.channelReference.compareAndSet(channel, channel2)) {
            return Mono.just(channel2);
        }
        try {
            channel2.close();
        } catch (IOException | TimeoutException e) {
        }
        return Mono.error(new RuntimeException("unable to create and register a new Channel"));
    }

    @Override // org.apache.james.backend.rabbitmq.RabbitMQChannelPool
    public boolean tryConnection() {
        try {
            if (this.connectionPool.tryConnection()) {
                if (getOpenChannel().blockOptional().isPresent()) {
                    return true;
                }
            }
            return false;
        } catch (Throwable th) {
            return false;
        }
    }
}
