package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.RpcClient;
import com.rabbitmq.client.amqp.impl.RpcSupport;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpRpcClient.class */
public class AmqpRpcClient implements RpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcClient.class);
    private static final Publisher.Callback NO_OP_CALLBACK = context -> {
    };
    private final AmqpConnection connection;
    private final Clock clock;
    private final Publisher publisher;
    private final Consumer consumer;
    private final Supplier<Object> correlationIdSupplier;
    private final BiFunction<Message, Object, Message> requestPostProcessor;
    private final Function<Message, Object> correlationIdExtractor;
    private final Duration requestTimeout;
    private final ScheduledFuture<?> requestTimeoutFuture;
    private final Map<Object, OutstandingRequest> outstandingRequests = new ConcurrentHashMap();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpRpcClient$OutstandingRequest.class */
    public static class OutstandingRequest {
        private final CompletableFuture<Message> future;
        private final long time;

        private OutstandingRequest(CompletableFuture<Message> completableFuture, long j) {
            this.future = completableFuture;
            this.time = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpRpcClient(RpcSupport.AmqpRpcClientBuilder amqpRpcClientBuilder) {
        this.connection = amqpRpcClientBuilder.connection();
        this.clock = this.connection.clock();
        AmqpPublisherBuilder amqpPublisherBuilder = (AmqpPublisherBuilder) this.connection.publisherBuilder();
        ((DefaultAddressBuilder) amqpRpcClientBuilder.requestAddress()).copyTo(amqpPublisherBuilder.addressBuilder());
        this.publisher = amqpPublisherBuilder.build();
        String replyToQueue = amqpRpcClientBuilder.replyToQueue();
        replyToQueue = replyToQueue == null ? this.connection.management().queue().exclusive(true).autoDelete(true).declare().name() : replyToQueue;
        if (amqpRpcClientBuilder.correlationIdExtractor() == null) {
            this.correlationIdExtractor = (v0) -> {
                return v0.correlationId();
            };
        } else {
            this.correlationIdExtractor = amqpRpcClientBuilder.correlationIdExtractor();
        }
        this.consumer = this.connection.consumerBuilder().queue(replyToQueue).messageHandler((context, message) -> {
            context.accept();
            OutstandingRequest remove = this.outstandingRequests.remove(this.correlationIdExtractor.apply(message));
            if (remove != null) {
                remove.future.complete(message);
            }
        }).build();
        if (amqpRpcClientBuilder.correlationIdSupplier() == null) {
            String uuid = UUID.randomUUID().toString();
            AtomicLong atomicLong = new AtomicLong();
            this.correlationIdSupplier = () -> {
                return uuid + "-" + atomicLong.getAndIncrement();
            };
        } else {
            this.correlationIdSupplier = amqpRpcClientBuilder.correlationIdSupplier();
        }
        if (amqpRpcClientBuilder.requestPostProcessor() == null) {
            DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
            addressBuilder.queue(replyToQueue);
            String address = addressBuilder.address();
            this.requestPostProcessor = (message2, obj) -> {
                return message2.replyTo(address).messageId(obj);
            };
        } else {
            this.requestPostProcessor = amqpRpcClientBuilder.requestPostProcessor();
        }
        this.requestTimeout = amqpRpcClientBuilder.requestTimeout();
        Runnable requestTimeoutTask = requestTimeoutTask();
        this.requestTimeoutFuture = this.connection.scheduledExecutorService().scheduleAtFixedRate(() -> {
            try {
                requestTimeoutTask.run();
            } catch (Exception e) {
                LOGGER.info("Error during request timeout task: {}", e.getMessage());
            }
        }, this.requestTimeout.toMillis(), this.requestTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.rabbitmq.client.amqp.RpcClient
    public Message message() {
        return this.publisher.message();
    }

    @Override // com.rabbitmq.client.amqp.RpcClient
    public Message message(byte[] bArr) {
        return this.publisher.message(bArr);
    }

    @Override // com.rabbitmq.client.amqp.RpcClient
    public CompletableFuture<Message> publish(Message message) {
        checkOpen();
        Object obj = this.correlationIdSupplier.get();
        Message apply = this.requestPostProcessor.apply(message, obj);
        long time = this.clock.time();
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        this.outstandingRequests.put(obj, new OutstandingRequest(completableFuture, time));
        this.publisher.publish(apply, NO_OP_CALLBACK);
        return completableFuture;
    }

    @Override // com.rabbitmq.client.amqp.RpcClient, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.removeRpcClient(this);
            this.requestTimeoutFuture.cancel(true);
            try {
                this.publisher.close();
            } catch (Exception e) {
                LOGGER.warn("Error while closing RPC client publisher: {}", e.getMessage());
            }
            try {
                this.consumer.close();
            } catch (Exception e2) {
                LOGGER.warn("Error while closing RPC client consumer: {}", e2.getMessage());
            }
            this.outstandingRequests.values().forEach(outstandingRequest -> {
                outstandingRequest.future.completeExceptionally(new AmqpException("RPC client is closed", new Object[0]));
            });
        }
    }

    Runnable requestTimeoutTask() {
        return () -> {
            long time = this.clock.time() - this.requestTimeout.toNanos();
            Iterator<OutstandingRequest> it = this.outstandingRequests.values().iterator();
            while (it.hasNext() && !Thread.currentThread().isInterrupted()) {
                OutstandingRequest next = it.next();
                if (next.time < time) {
                    try {
                        it.remove();
                    } catch (Exception e) {
                        LOGGER.warn("Error while pruning timed out request: {}", e.getMessage());
                    }
                    next.future.completeExceptionally(new AmqpException("RPC request timed out", new Object[0]));
                }
            }
        };
    }

    private void checkOpen() {
        if (this.closed.get()) {
            throw new AmqpException("RPC client is closed", new Object[0]);
        }
    }
}
