package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.RpcServer;
import com.rabbitmq.client.amqp.impl.RpcSupport;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpRpcServer.class */
public class AmqpRpcServer implements RpcServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class);
    private static final Publisher.Callback NO_OP_CALLBACK = context -> {
    };
    private static final Predicate<Exception> RESPONSE_SENDING_EXCEPTION_PREDICATE = exc -> {
        return (exc instanceof AmqpException.AmqpResourceInvalidStateException) && !(exc instanceof AmqpException.AmqpResourceClosedException);
    };
    private static final List<Duration> RESPONSE_SENDING_RETRY_WAIT_TIMES = List.of(Duration.ofSeconds(1), Duration.ofSeconds(3), Duration.ofSeconds(5), Duration.ofSeconds(10));
    private final AmqpConnection connection;
    private final Publisher publisher;
    private final Consumer consumer;
    private final Function<Message, Object> correlationIdExtractor;
    private final BiFunction<Message, Object, Message> replyPostProcessor;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder amqpRpcServerBuilder) {
        this.connection = amqpRpcServerBuilder.connection();
        RpcServer.Handler handler = amqpRpcServerBuilder.handler();
        this.publisher = this.connection.publisherBuilder().build();
        RpcServer.Context context = new RpcServer.Context() { // from class: com.rabbitmq.client.amqp.impl.AmqpRpcServer.1
            @Override // com.rabbitmq.client.amqp.RpcServer.Context
            public Message message() {
                return AmqpRpcServer.this.publisher.message();
            }

            @Override // com.rabbitmq.client.amqp.RpcServer.Context
            public Message message(byte[] bArr) {
                return AmqpRpcServer.this.publisher.message(bArr);
            }
        };
        if (amqpRpcServerBuilder.correlationIdExtractor() == null) {
            this.correlationIdExtractor = (v0) -> {
                return v0.messageId();
            };
        } else {
            this.correlationIdExtractor = amqpRpcServerBuilder.correlationIdExtractor();
        }
        if (amqpRpcServerBuilder.replyPostProcessor() == null) {
            this.replyPostProcessor = (v0, v1) -> {
                return v0.correlationId(v1);
            };
        } else {
            this.replyPostProcessor = amqpRpcServerBuilder.replyPostProcessor();
        }
        this.consumer = this.connection.consumerBuilder().queue(amqpRpcServerBuilder.requestQueue()).messageHandler((context2, message) -> {
            context2.accept();
            Message handle = handler.handle(context, message);
            if (handle != null && message.replyTo() != null) {
                handle.to(message.replyTo());
            }
            Message apply = this.replyPostProcessor.apply(handle, this.correlationIdExtractor.apply(message));
            if (apply != null) {
                sendReply(apply);
            }
        }).build();
    }

    @Override // com.rabbitmq.client.amqp.RpcServer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.removeRpcServer(this);
            try {
                this.consumer.close();
            } catch (Exception e) {
                LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage());
            }
            try {
                this.publisher.close();
            } catch (Exception e2) {
                LOGGER.warn("Error while closing RPC server publisher: {}", e2.getMessage());
            }
        }
    }

    private void sendReply(Message message) {
        try {
            RetryUtils.callAndMaybeRetry(() -> {
                this.publisher.publish(message, NO_OP_CALLBACK);
                return null;
            }, RESPONSE_SENDING_EXCEPTION_PREDICATE, RESPONSE_SENDING_RETRY_WAIT_TIMES, "RPC Server Response", new Object[0]);
        } catch (Exception e) {
            LOGGER.info("Error while processing RPC request: {}", e.getMessage());
        }
    }
}
