package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.ObservationCollector;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.Resource;
import com.rabbitmq.client.amqp.impl.DefaultAddressBuilder;
import com.rabbitmq.client.amqp.impl.Utils;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
import com.rabbitmq.qpid.protonj2.client.DeliveryMode;
import com.rabbitmq.qpid.protonj2.client.DeliveryState;
import com.rabbitmq.qpid.protonj2.client.Sender;
import com.rabbitmq.qpid.protonj2.client.SenderOptions;
import com.rabbitmq.qpid.protonj2.client.Session;
import com.rabbitmq.qpid.protonj2.client.Tracker;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpPublisher.class */
public final class AmqpPublisher extends ResourceBase implements Publisher {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublisher.class);
    private final Long id;
    private volatile Sender sender;
    private final ExecutorService executorService;
    private final String address;
    private final AmqpConnection connection;
    private final AtomicBoolean closed;
    private final MetricsCollector metricsCollector;
    private final ObservationCollector observationCollector;
    private final Function<Message, Tracker> publishCall;
    private final DefaultAddressBuilder.DestinationSpec destinationSpec;
    private final Duration publishTimeout;
    private final SessionHandler sessionHandler;
    private volatile ObservationCollector.ConnectionInfo connectionInfo;

    /* loaded from: input_file:com/rabbitmq/client/amqp/impl/AmqpPublisher$DefaultContext.class */
    private static class DefaultContext implements Publisher.Context {
        private final Message message;
        private final Publisher.Status status;

        private DefaultContext(Message message, Publisher.Status status) {
            this.message = message;
            this.status = status;
        }

        @Override // com.rabbitmq.client.amqp.Publisher.Context
        public Message message() {
            return this.message;
        }

        @Override // com.rabbitmq.client.amqp.Publisher.Context
        public Publisher.Status status() {
            return this.status;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpPublisher(AmqpPublisherBuilder amqpPublisherBuilder) {
        super(amqpPublisherBuilder.listeners());
        this.closed = new AtomicBoolean(false);
        this.id = Long.valueOf(ID_SEQUENCE.getAndIncrement());
        this.executorService = amqpPublisherBuilder.connection().environment().publisherExecutorService();
        this.address = amqpPublisherBuilder.address();
        this.destinationSpec = amqpPublisherBuilder.destination();
        this.connection = amqpPublisherBuilder.connection();
        this.publishTimeout = amqpPublisherBuilder.publishTimeout();
        this.sessionHandler = this.connection.createSessionHandler();
        this.sender = createSender(this.sessionHandler.session(), this.address, this.publishTimeout);
        this.metricsCollector = this.connection.metricsCollector();
        this.observationCollector = this.connection.observationCollector();
        state(Resource.State.OPEN);
        this.metricsCollector.openPublisher();
        this.publishCall = message -> {
            try {
                return this.sender.send(((AmqpMessage) message).nativeMessage().durable(true));
            } catch (ClientIllegalStateException e) {
                LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
                close(ExceptionUtils.convert(e));
                throw ExceptionUtils.convert(e);
            } catch (ClientException e2) {
                LOGGER.debug("Error while publishing: '{}'.", e2.getMessage());
                throw ExceptionUtils.convert(e2);
            }
        };
        this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
    }

    @Override // com.rabbitmq.client.amqp.Publisher
    public Message message() {
        return new AmqpMessage();
    }

    @Override // com.rabbitmq.client.amqp.Publisher
    public Message message(byte[] bArr) {
        return new AmqpMessage(bArr);
    }

    @Override // com.rabbitmq.client.amqp.Publisher
    public void publish(Message message, Publisher.Callback callback) {
        checkOpen();
        Tracker tracker = (Tracker) this.observationCollector.publish(this.destinationSpec.exchange(), this.destinationSpec.routingKey(), message, this.connectionInfo, this.publishCall);
        this.executorService.submit(() -> {
            Publisher.Status status;
            try {
                tracker.settlementFuture().get();
                status = mapDeliveryState(tracker.remoteState());
            } catch (InterruptedException | ExecutionException e) {
                status = Publisher.Status.REJECTED;
            }
            DefaultContext defaultContext = new DefaultContext(message, status);
            this.metricsCollector.publishDisposition(mapToPublishDisposition(status));
            callback.handle(defaultContext);
        });
        this.metricsCollector.publish();
    }

    private Publisher.Status mapDeliveryState(DeliveryState deliveryState) {
        if (deliveryState.isAccepted()) {
            return Publisher.Status.ACCEPTED;
        }
        if (deliveryState.getType() == DeliveryState.Type.REJECTED) {
            return Publisher.Status.REJECTED;
        }
        if (deliveryState.getType() == DeliveryState.Type.RELEASED) {
            return Publisher.Status.RELEASED;
        }
        LOGGER.warn("Delivery state not supported: " + deliveryState.getType());
        throw new IllegalStateException("This delivery state is not supported: " + deliveryState.getType());
    }

    private static MetricsCollector.PublishDisposition mapToPublishDisposition(Publisher.Status status) {
        if (status == Publisher.Status.ACCEPTED) {
            return MetricsCollector.PublishDisposition.ACCEPTED;
        }
        if (status == Publisher.Status.REJECTED) {
            return MetricsCollector.PublishDisposition.REJECTED;
        }
        if (status == Publisher.Status.RELEASED) {
            return MetricsCollector.PublishDisposition.RELEASED;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recoverAfterConnectionFailure() {
        this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
        this.sender = createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout);
    }

    @Override // com.rabbitmq.client.amqp.Publisher, java.lang.AutoCloseable
    public void close() {
        close(null);
    }

    private Sender createSender(Session session, String str, Duration duration) {
        try {
            return (Sender) ExceptionUtils.wrapGet((str == null ? session.openAnonymousSender() : session.openSender(str, new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE).sendTimeout(duration.isNegative() ? -1L : duration.toMillis()))).openFuture());
        } catch (ClientException e) {
            throw ExceptionUtils.convert(e, "Error while creating publisher to %s", str);
        }
    }

    private void close(Throwable th) {
        if (this.closed.compareAndSet(false, true)) {
            state(Resource.State.CLOSING, th);
            this.connection.removePublisher(this);
            try {
                this.sender.close();
                this.sessionHandler.close();
            } catch (Exception e) {
                LOGGER.warn("Error while closing sender", e);
            }
            state(Resource.State.CLOSED, th);
            this.metricsCollector.closePublisher();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long id() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String address() {
        return this.address;
    }
}
