package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:com/microsoft/azure/servicebus/MessageSender.class */
public class MessageSender extends ClientEntity implements IAmqpSender, IErrorContextProvider {
    private static final Logger TRACE_LOGGER;
    private static final String SEND_TIMED_OUT = "Send operation timed out.";
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final Runnable operationTimer;
    private final Duration timerTimeout;
    private ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> pendingSendWaiters;
    private ConcurrentLinkedQueue<byte[]> pendingSendsWaitingForCredit;
    private Sender sendLink;
    private CompletableFuture<MessageSender> linkFirstOpen;
    private AtomicLong nextTag;
    private AtomicInteger linkCredit;
    private TimeoutTracker openLinkTracker;
    private boolean linkCreateScheduled;
    private Object linkCreateLock;
    private Exception lastKnownLinkError;
    private Object sendCall;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static CompletableFuture<MessageSender> create(MessagingFactory messagingFactory, String str, String str2) {
        MessageSender messageSender = new MessageSender(messagingFactory, str, str2);
        messageSender.openLinkTracker = TimeoutTracker.create(messagingFactory.getOperationTimeout());
        messageSender.initializeLinkOpen(messageSender.openLinkTracker);
        messageSender.linkCreateScheduled = true;
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageSender.1
            @Override // java.lang.Runnable
            public void run() {
                MessageSender.this.sendLink = MessageSender.this.createSendLink();
            }
        }, Duration.ofSeconds(0L), TimerType.OneTimeRun);
        return messageSender.linkFirstOpen;
    }

    private MessageSender(MessagingFactory messagingFactory, String str, String str2) {
        super(str);
        this.sendPath = str2;
        this.underlyingFactory = messagingFactory;
        this.operationTimeout = messagingFactory.getOperationTimeout();
        this.timerTimeout = this.operationTimeout.getSeconds() > 9 ? this.operationTimeout.dividedBy(3L) : Duration.ofSeconds(5L);
        this.lastKnownLinkError = null;
        this.retryPolicy = messagingFactory.getRetryPolicy();
        this.pendingSendWaiters = new ConcurrentHashMap<>();
        this.pendingSendsWaitingForCredit = new ConcurrentLinkedQueue<>();
        this.nextTag = new AtomicLong(0L);
        this.linkCredit = new AtomicInteger(0);
        this.linkCreateLock = new Object();
        this.sendCall = new Object();
        this.operationTimer = new Runnable() { // from class: com.microsoft.azure.servicebus.MessageSender.2
            @Override // java.lang.Runnable
            public void run() {
                Map.Entry entry;
                if (MessageSender.this.pendingSendWaiters != null) {
                    Iterator it = MessageSender.this.pendingSendWaiters.entrySet().iterator();
                    while (it.hasNext() && (entry = (Map.Entry) it.next()) != null) {
                        ReplayableWorkItem replayableWorkItem = (ReplayableWorkItem) entry.getValue();
                        if (replayableWorkItem.getTimeoutTracker().remaining().compareTo(ClientConstants.TIMER_TOLERANCE) < 0) {
                            it.remove();
                            MessageSender.this.throwSenderTimeout(replayableWorkItem.getWork(), replayableWorkItem.getLastKnownException());
                        }
                    }
                }
            }
        };
    }

    public String getSendPath() {
        return this.sendPath;
    }

    private CompletableFuture<Void> send(byte[] bArr, int i, int i2) {
        return send(bArr, i, i2, null, null);
    }

    private byte[] getNextDeliveryTag() {
        long incrementAndGet = this.nextTag.incrementAndGet();
        byte[] bArr = new byte[8];
        for (int i = 0; i < 8; i++) {
            bArr[i] = (byte) (incrementAndGet >> (8 * ((8 - i) - 1)));
        }
        return bArr;
    }

    private CompletableFuture<Void> send(byte[] bArr, int i, int i2, CompletableFuture<Void> completableFuture, TimeoutTracker timeoutTracker, byte[] bArr2) {
        if (timeoutTracker != null && completableFuture != null && (timeoutTracker.remaining().isNegative() || timeoutTracker.remaining().isZero())) {
            if (bArr2 != null) {
                this.pendingSendWaiters.remove(bArr2);
            }
            throwSenderTimeout(completableFuture, null);
            return completableFuture;
        }
        byte[] nextDeliveryTag = bArr2 == null ? getNextDeliveryTag() : bArr2;
        boolean z = false;
        if (this.sendLink.getLocalState() == EndpointState.CLOSED) {
            scheduleRecreate(Duration.ofMillis(1L));
        } else if (this.linkCredit.get() > 0 && (this.pendingSendsWaitingForCredit.isEmpty() || this.pendingSendsWaitingForCredit.peek() == bArr2)) {
            synchronized (this.sendCall) {
                if (this.linkCredit.get() > 0 && (this.pendingSendsWaitingForCredit.isEmpty() || this.pendingSendsWaitingForCredit.peek() == bArr2)) {
                    this.linkCredit.decrementAndGet();
                    this.sendLink.delivery(nextDeliveryTag).setMessageFormat(i2);
                    int send = this.sendLink.send(bArr, 0, i);
                    if (!$assertionsDisabled && send != i) {
                        throw new AssertionError("Contract of the ProtonJ library for Sender.Send API changed");
                    }
                    this.sendLink.advance();
                    z = true;
                }
            }
        }
        if (!z) {
            this.pendingSendsWaitingForCredit.offer(nextDeliveryTag);
        }
        CompletableFuture<Void> completableFuture2 = completableFuture == null ? new CompletableFuture<>() : completableFuture;
        this.pendingSendWaiters.put(nextDeliveryTag, timeoutTracker == null ? new ReplayableWorkItem<>(bArr, i, i2, completableFuture2, this.operationTimeout) : new ReplayableWorkItem<>(bArr, i, i2, completableFuture2, timeoutTracker));
        return completableFuture2;
    }

    private CompletableFuture<Void> send(byte[] bArr, int i, int i2, CompletableFuture<Void> completableFuture, TimeoutTracker timeoutTracker) {
        return send(bArr, i, i2, completableFuture, timeoutTracker, null);
    }

    private int getPayloadSize(Message message) {
        Data body;
        Binary value;
        if (message == null || message.getBody() == null || (body = message.getBody()) == null || (value = body.getValue()) == null) {
            return 0;
        }
        return value.getLength();
    }

    private int getDataSerializedSize(Message message) {
        if (message == null) {
            return 0;
        }
        int payloadSize = getPayloadSize(message);
        MessageAnnotations messageAnnotations = message.getMessageAnnotations();
        if (messageAnnotations == null) {
            return payloadSize;
        }
        int i = 0;
        Iterator it = messageAnnotations.getValue().values().iterator();
        while (it.hasNext()) {
            i += it.next().toString().length();
        }
        return i + payloadSize;
    }

    public CompletableFuture<Void> send(Iterable<Message> iterable) {
        if (iterable == null || IteratorUtil.sizeEquals(iterable, 0)) {
            throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
        }
        Message next = iterable.iterator().next();
        if (IteratorUtil.sizeEquals(iterable, 1)) {
            return send(next);
        }
        Message message = Proton.message();
        message.setMessageAnnotations(next.getMessageAnnotations());
        byte[] bArr = new byte[ClientConstants.MAX_MESSAGE_LENGTH_BYTES];
        int encode = message.encode(bArr, 0, ClientConstants.MAX_MESSAGE_LENGTH_BYTES);
        for (Message message2 : iterable) {
            Message message3 = Proton.message();
            int min = Math.min(getDataSerializedSize(message2) + ClientConstants.MAX_EVENTHUB_AMQP_HEADER_SIZE_BYTES, ClientConstants.MAX_MESSAGE_LENGTH_BYTES);
            byte[] bArr2 = new byte[min];
            message3.setBody(new Data(new Binary(bArr2, 0, message2.encode(bArr2, 0, min))));
            try {
                encode += message3.encode(bArr, encode, (ClientConstants.MAX_MESSAGE_LENGTH_BYTES - encode) - 1);
            } catch (BufferOverflowException e) {
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), e));
                return completableFuture;
            }
        }
        return send(bArr, encode, AmqpConstants.AMQP_BATCH_MESSAGE_FORMAT);
    }

    public CompletableFuture<Void> send(Message message) {
        int min = Math.min(getDataSerializedSize(message) + ClientConstants.MAX_EVENTHUB_AMQP_HEADER_SIZE_BYTES, ClientConstants.MAX_MESSAGE_LENGTH_BYTES);
        byte[] bArr = new byte[min];
        try {
            return send(bArr, message.encode(bArr, 0, min), 0);
        } catch (BufferOverflowException e) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), e));
            return completableFuture;
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onOpenComplete(Exception exc) {
        if (exc == null) {
            this.openLinkTracker = null;
            this.retryPolicy.resetRetryCount(getClientId());
            this.lastKnownLinkError = null;
            if (!this.linkFirstOpen.isDone()) {
                this.linkFirstOpen.complete(this);
                Timer.schedule(this.operationTimer, this.timerTimeout, TimerType.RepeatRun);
            } else if (!this.pendingSendWaiters.isEmpty()) {
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                concurrentHashMap.putAll(this.pendingSendWaiters);
                if (concurrentHashMap.size() > 0) {
                    concurrentHashMap.forEachEntry(1L, new Consumer<Map.Entry<byte[], ReplayableWorkItem<Void>>>() { // from class: com.microsoft.azure.servicebus.MessageSender.3
                        @Override // java.util.function.Consumer
                        public void accept(Map.Entry<byte[], ReplayableWorkItem<Void>> entry) {
                            byte[] key = entry.getKey();
                            MessageSender.this.pendingSendsWaitingForCredit.remove(key);
                            MessageSender.this.reSend(key, false);
                        }
                    });
                }
                concurrentHashMap.clear();
            }
        } else {
            ExceptionUtil.completeExceptionally(this.linkFirstOpen, exc, this);
        }
        synchronized (this.linkCreateLock) {
            this.linkCreateScheduled = false;
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onClose(ErrorCondition errorCondition) {
        onError(errorCondition != null ? ExceptionUtil.toException(errorCondition) : new ServiceBusException(true, "The entity has been close due to transient failures (underlying link closed), please retry the operation."));
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onError(Exception exc) {
        Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(getClientId(), exc, this.openLinkTracker == null ? this.operationTimeout : this.openLinkTracker.elapsed().compareTo(this.operationTimeout) > 0 ? Duration.ofSeconds(0L) : this.operationTimeout.minus(this.openLinkTracker.elapsed()));
        if (exc != null) {
            this.lastKnownLinkError = exc;
        }
        if (nextRetryInterval != null) {
            scheduleRecreate(nextRetryInterval);
        } else {
            onOpenComplete(exc);
        }
    }

    private void scheduleRecreate(Duration duration) {
        synchronized (this.linkCreateLock) {
            if (this.linkCreateScheduled) {
                return;
            }
            this.linkCreateScheduled = true;
            Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageSender.4
                @Override // java.lang.Runnable
                public void run() {
                    if (MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
                        return;
                    }
                    Sender createSendLink = MessageSender.this.createSendLink();
                    if (createSendLink != null) {
                        MessageSender.this.underlyingFactory.deregisterForConnectionError(MessageSender.this.sendLink);
                        MessageSender.this.sendLink = createSendLink;
                    } else {
                        synchronized (MessageSender.this.linkCreateLock) {
                            MessageSender.this.linkCreateScheduled = false;
                        }
                    }
                    MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
                }
            }, duration, TimerType.OneTimeRun);
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpSender
    public void onSendComplete(final byte[] bArr, DeliveryState deliveryState) {
        if (TRACE_LOGGER.isLoggable(Level.FINEST)) {
            TRACE_LOGGER.log(Level.FINEST, String.format("linkName[%s]", this.sendLink.getName()));
        }
        ReplayableWorkItem<Void> replayableWorkItem = this.pendingSendWaiters.get(bArr);
        if (replayableWorkItem != null) {
            CompletableFuture<Void> work = replayableWorkItem.getWork();
            if (deliveryState instanceof Accepted) {
                this.retryPolicy.resetRetryCount(getClientId());
                this.pendingSendWaiters.remove(bArr);
                work.complete(null);
            } else {
                if (!(deliveryState instanceof Rejected)) {
                    this.pendingSendWaiters.remove(bArr);
                    ExceptionUtil.completeExceptionally(work, new ServiceBusException(false, deliveryState.toString()), this);
                    return;
                }
                Exception exception = ExceptionUtil.toException(((Rejected) deliveryState).getError());
                Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(getClientId(), exception, replayableWorkItem.getTimeoutTracker().remaining());
                if (nextRetryInterval == null) {
                    this.pendingSendWaiters.remove(bArr);
                    ExceptionUtil.completeExceptionally(work, exception, this);
                } else {
                    replayableWorkItem.setLastKnownException(exception);
                    Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageSender.5
                        @Override // java.lang.Runnable
                        public void run() {
                            MessageSender.this.reSend(bArr, false);
                        }
                    }, nextRetryInterval, TimerType.OneTimeRun);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reSend(byte[] bArr, boolean z) {
        ReplayableWorkItem<Void> remove = this.pendingSendWaiters.remove(bArr);
        if (remove != null) {
            send(remove.getMessage(), remove.getEncodedMessageSize(), remove.getMessageFormat(), remove.getWork(), remove.getTimeoutTracker(), z ? bArr : null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Sender createSendLink() {
        try {
            Connection connection = this.underlyingFactory.getConnection().get(this.operationTimeout.getSeconds(), TimeUnit.SECONDS);
            if (connection == null || connection.getLocalState() == EndpointState.CLOSED) {
                return null;
            }
            Session session = connection.session();
            session.setOutgoingWindow(2147483647L);
            session.open();
            BaseHandler.setHandler(session, new SessionHandler(this.sendPath));
            Link sender = session.sender(StringUtil.getRandomString().concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()));
            Target target = new Target();
            target.setAddress(this.sendPath);
            sender.setTarget(target);
            sender.setSource(new Source());
            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
            BaseHandler.setHandler(sender, new SendLinkHandler(this));
            this.underlyingFactory.registerForConnectionError(sender);
            sender.open();
            return sender;
        } catch (InterruptedException | ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause != null && (cause instanceof Exception)) {
                onError((Exception) cause);
            }
            if (!(e instanceof InterruptedException)) {
                return null;
            }
            Thread.currentThread().interrupt();
            return null;
        } catch (TimeoutException e2) {
            onError(new ServiceBusException(false, "Connection creation timed out.", e2));
            return null;
        }
    }

    private void initializeLinkOpen(TimeoutTracker timeoutTracker) {
        this.linkFirstOpen = new CompletableFuture<>();
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageSender.6
            @Override // java.lang.Runnable
            public void run() {
                if (MessageSender.this.linkFirstOpen.isDone()) {
                    return;
                }
                ServiceBusException serviceBusException = new ServiceBusException(true, String.format(Locale.US, "SendLink(%s).open() on Entity(%s) timed out", MessageSender.this.sendLink.getName(), MessageSender.this.getSendPath()));
                if (MessageSender.TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    MessageSender.TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "message Sender(linkName: %s, path: %s) open call timedout", MessageSender.this.getClientId(), MessageSender.this.sendPath), (Throwable) serviceBusException);
                }
                ExceptionUtil.completeExceptionally(MessageSender.this.linkFirstOpen, serviceBusException, MessageSender.this);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.ClientEntity
    public CompletableFuture<Void> close() {
        if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
            this.sendLink.close();
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // com.microsoft.azure.servicebus.IErrorContextProvider
    public ErrorContext getContext() {
        return new SenderContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, (this.sendLink == null || this.sendLink.getRemoteProperties() == null || !this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)) ? this.sendLink != null ? this.sendLink.getName() : null : this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString(), this.linkFirstOpen != null && this.linkFirstOpen.isDone() ? Integer.valueOf(this.sendLink.getCredit()) : null);
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpSender
    public void onFlow() {
        int remoteCredit;
        synchronized (this.sendCall) {
            remoteCredit = this.sendLink.getRemoteCredit();
        }
        if (remoteCredit <= 0) {
            return;
        }
        if (TRACE_LOGGER.isLoggable(Level.FINE)) {
            TRACE_LOGGER.log(Level.FINE, String.format(Locale.US, "linkName[%s], path[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]", getClientId(), this.sendPath, Integer.valueOf(remoteCredit), Integer.valueOf(this.pendingSendsWaitingForCredit.size()), Integer.valueOf(this.pendingSendWaiters.size())));
        }
        this.linkCredit.addAndGet(remoteCredit);
        while (!this.pendingSendsWaitingForCredit.isEmpty() && this.linkCredit.get() > 0) {
            byte[] peek = this.pendingSendsWaitingForCredit.peek();
            if (peek != null) {
                reSend(peek, true);
                this.pendingSendsWaitingForCredit.poll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void throwSenderTimeout(CompletableFuture<Void> completableFuture, Exception exc) {
        Exception exc2 = exc == null ? this.lastKnownLinkError : exc;
        ExceptionUtil.completeExceptionally(completableFuture, new ServiceBusException((exc2 == null || !(exc2 instanceof ServiceBusException)) ? true : ((ServiceBusException) exc2).getIsTransient(), SEND_TIMED_OUT, exc2), this);
    }

    static {
        $assertionsDisabled = !MessageSender.class.desiredAssertionStatus();
        TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);
    }
}
