package com.rabbitmq.qpid.protonj2.client.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonBuffer;
import com.rabbitmq.qpid.protonj2.client.AdvancedMessage;
import com.rabbitmq.qpid.protonj2.client.Message;
import com.rabbitmq.qpid.protonj2.client.StreamSender;
import com.rabbitmq.qpid.protonj2.client.StreamSenderMessage;
import com.rabbitmq.qpid.protonj2.client.StreamSenderOptions;
import com.rabbitmq.qpid.protonj2.client.StreamTracker;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import com.rabbitmq.qpid.protonj2.client.futures.ClientFuture;
import com.rabbitmq.qpid.protonj2.client.futures.ClientSynchronization;
import com.rabbitmq.qpid.protonj2.client.impl.ClientTransactionContext;
import com.rabbitmq.qpid.protonj2.engine.OutgoingDelivery;
import com.rabbitmq.qpid.protonj2.engine.Sender;
import com.rabbitmq.qpid.protonj2.engine.util.StringUtils;
import com.rabbitmq.qpid.protonj2.types.messaging.DeliveryAnnotations;
import com.rabbitmq.qpid.protonj2.types.transport.DeliveryState;
import com.rabbitmq.qpid.protonj2.types.transport.SenderSettleMode;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/qpid/protonj2/client/impl/ClientStreamSender.class */
public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> implements StreamSender {
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class);
    private final StreamSenderOptions options;
    private final Deque<ClientOutgoingEnvelope> blocked;

    /* loaded from: input_file:com/rabbitmq/qpid/protonj2/client/impl/ClientStreamSender$ClientOutgoingEnvelope.class */
    public static final class ClientOutgoingEnvelope implements ClientTransactionContext.Sendable {
        private final ProtonBuffer payload;
        private final ClientFuture<StreamTracker> request;
        private final ClientStreamSender sender;
        private final boolean complete;
        private final int messageFormat;
        private boolean aborted;
        private Future<?> sendTimeout;
        private OutgoingDelivery delivery;

        public ClientOutgoingEnvelope(ClientStreamSender clientStreamSender, OutgoingDelivery outgoingDelivery, int i, ProtonBuffer protonBuffer, boolean z, ClientFuture<StreamTracker> clientFuture) {
            this.payload = protonBuffer;
            this.request = clientFuture;
            this.sender = clientStreamSender;
            this.complete = z;
            this.messageFormat = i;
            this.delivery = outgoingDelivery;
        }

        public Future<?> sendTimeout() {
            return this.sendTimeout;
        }

        public void sendTimeout(Future<?> future) {
            this.sendTimeout = future;
        }

        public ProtonBuffer payload() {
            return this.payload;
        }

        public OutgoingDelivery delivery() {
            return this.delivery;
        }

        public ClientOutgoingEnvelope abort() {
            this.aborted = true;
            return this;
        }

        public boolean aborted() {
            return this.aborted;
        }

        @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientTransactionContext.Sendable
        public void discard() {
            if (this.sendTimeout != null) {
                this.sendTimeout.cancel(true);
                this.sendTimeout = null;
            }
            if (this.payload != null) {
                this.payload.close();
            }
            if (this.delivery == null) {
                this.request.complete(this.sender.createNoOpTracker());
                return;
            }
            ClientTracker clientTracker = (ClientTracker) this.delivery.getLinkedResource();
            if (clientTracker != null) {
                clientTracker.settlementFuture().complete(clientTracker);
            }
            this.request.complete((StreamTracker) this.delivery.getLinkedResource());
        }

        public ClientOutgoingEnvelope succeeded() {
            if (this.sendTimeout != null) {
                this.sendTimeout.cancel(true);
            }
            if (this.payload != null) {
                this.payload.close();
            }
            this.request.complete((StreamTracker) this.delivery.getLinkedResource());
            return this;
        }

        public ClientOutgoingEnvelope failed(ClientException clientException) {
            if (this.sendTimeout != null) {
                this.sendTimeout.cancel(true);
            }
            if (this.delivery != null) {
                try {
                    this.delivery.abort();
                } catch (Exception e) {
                }
            }
            if (this.payload != null) {
                this.payload.close();
            }
            this.request.failed(clientException);
            return this;
        }

        @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientTransactionContext.Sendable
        public void send(DeliveryState deliveryState, boolean z) {
            if (this.delivery == null) {
                this.delivery = this.sender.protonLink().next();
                this.delivery.setLinkedResource(this.sender.createTracker(this.delivery));
            }
            if (this.delivery.getTransferCount() == 0) {
                this.delivery.setMessageFormat(this.messageFormat);
                this.delivery.disposition(deliveryState, z);
            }
            if (aborted()) {
                this.delivery.abort();
                succeeded();
                return;
            }
            boolean autoFlushOff = this.sender.connection().autoFlushOff();
            try {
                this.delivery.streamBytes(this.payload, this.complete);
                if (this.payload == null || !this.payload.isReadable()) {
                    succeeded();
                } else {
                    this.sender.addToHeadOfBlockedQueue(this);
                }
            } finally {
                if (autoFlushOff) {
                    this.sender.connection().flush();
                    this.sender.connection().autoFlushOn();
                }
            }
        }

        public ClientException createSendTimedOutException() {
            return new ClientSendTimedOutException("Timed out waiting for credit to send");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamSender(ClientSession clientSession, StreamSenderOptions streamSenderOptions, String str, Sender sender) {
        super(clientSession, str, streamSenderOptions, sender);
        this.blocked = new ArrayDeque();
        this.options = new StreamSenderOptions(streamSenderOptions);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public StreamTracker send(Message<?> message) throws ClientException {
        checkClosedOrFailed();
        return sendMessage(ClientMessageSupport.convertMessage(message), (Map<String, Object>) null, true);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public StreamTracker send(Message<?> message, Map<String, Object> map) throws ClientException {
        checkClosedOrFailed();
        return sendMessage(ClientMessageSupport.convertMessage(message), (Map<String, Object>) null, true);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public StreamTracker trySend(Message<?> message) throws ClientException {
        checkClosedOrFailed();
        return sendMessage(ClientMessageSupport.convertMessage(message), (Map<String, Object>) null, false);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public StreamTracker trySend(Message<?> message, Map<String, Object> map) throws ClientException {
        checkClosedOrFailed();
        return sendMessage(ClientMessageSupport.convertMessage(message), (Map<String, Object>) null, false);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public ClientStreamSenderMessage beginMessage() throws ClientException {
        return beginMessage((Map<String, Object>) null);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public ClientStreamSenderMessage beginMessage(Map<String, Object> map) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        DeliveryAnnotations deliveryAnnotations = map != null ? new DeliveryAnnotations(StringUtils.toSymbolKeyedMap(map)) : null;
        this.executor.execute(() -> {
            if (this.protonSender.current() != null) {
                createFuture.failed(new ClientIllegalStateException("Cannot initiate a new streaming send until the previous one is complete"));
                return;
            }
            OutgoingDelivery next = this.protonSender.next();
            ClientStreamTracker createTracker = createTracker(next);
            next.setLinkedResource(createTracker);
            createFuture.complete(new ClientStreamSenderMessage(this, createTracker, deliveryAnnotations));
        });
        return (ClientStreamSenderMessage) this.session.request(this, createFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamSenderOptions options() {
        return this.options;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    public StreamSender self() {
        return this;
    }

    private void addToTailOfBlockedQueue(ClientOutgoingEnvelope clientOutgoingEnvelope) {
        this.blocked.addLast(clientOutgoingEnvelope);
        if (this.options.sendTimeout() <= 0 || clientOutgoingEnvelope.sendTimeout() != null) {
            return;
        }
        clientOutgoingEnvelope.sendTimeout(this.executor.schedule(() -> {
            this.blocked.remove(clientOutgoingEnvelope);
            clientOutgoingEnvelope.failed(clientOutgoingEnvelope.createSendTimedOutException());
        }, this.options.sendTimeout(), TimeUnit.MILLISECONDS));
    }

    private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope clientOutgoingEnvelope) {
        this.blocked.addFirst(clientOutgoingEnvelope);
        if (this.options.sendTimeout() <= 0 || clientOutgoingEnvelope.sendTimeout() != null) {
            return;
        }
        clientOutgoingEnvelope.sendTimeout(this.executor.schedule(() -> {
            this.blocked.remove(clientOutgoingEnvelope);
            clientOutgoingEnvelope.failed(clientOutgoingEnvelope.createSendTimedOutException());
        }, this.options.sendTimeout(), TimeUnit.MILLISECONDS));
    }

    private StreamTracker sendMessage(AdvancedMessage<?> advancedMessage, Map<String, Object> map, boolean z) throws ClientException {
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        ProtonBuffer encode = advancedMessage.encode(map);
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                try {
                    ClientOutgoingEnvelope clientOutgoingEnvelope = new ClientOutgoingEnvelope(this, null, advancedMessage.messageFormat(), encode, true, createFuture);
                    if (this.protonSender.isSendable() && this.protonSender.current() == null) {
                        this.session.getTransactionContext().send(clientOutgoingEnvelope, null, this.protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED);
                    } else if (z) {
                        addToTailOfBlockedQueue(clientOutgoingEnvelope);
                    } else {
                        createFuture.complete(null);
                    }
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return (StreamTracker) this.session.request(this, createFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamTracker sendMessage(ClientStreamSenderMessage clientStreamSenderMessage, ProtonBuffer protonBuffer, int i) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        ClientOutgoingEnvelope clientOutgoingEnvelope = new ClientOutgoingEnvelope(this, clientStreamSenderMessage.getProtonDelivery(), i, protonBuffer, clientStreamSenderMessage.completed(), createFuture);
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture, clientStreamSenderMessage.getProtonDelivery().getLink())) {
                try {
                    if (this.protonSender.isSendable()) {
                        this.session.getTransactionContext().send(clientOutgoingEnvelope, null, isSendingSettled());
                    } else {
                        addToHeadOfBlockedQueue(clientOutgoingEnvelope);
                    }
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return (StreamTracker) this.session.request(this, createFuture);
    }

    private ClientStreamTracker createTracker(OutgoingDelivery outgoingDelivery) {
        return new ClientStreamTracker(this, outgoingDelivery);
    }

    private ClientNoOpStreamTracker createNoOpTracker() {
        return new ClientNoOpStreamTracker(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientSenderLinkType
    public void disposition(OutgoingDelivery outgoingDelivery, DeliveryState deliveryState, boolean z) throws ClientException {
        checkClosedOrFailed();
        this.executor.execute(() -> {
            outgoingDelivery.disposition(deliveryState, z);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void abort(OutgoingDelivery outgoingDelivery, ClientStreamTracker clientStreamTracker) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() { // from class: com.rabbitmq.qpid.protonj2.client.impl.ClientStreamSender.1
            @Override // com.rabbitmq.qpid.protonj2.client.futures.ClientSynchronization
            public void onPendingSuccess(StreamTracker streamTracker) {
                ClientStreamSender.this.handleCreditStateUpdated(ClientStreamSender.this.protonLink());
            }

            @Override // com.rabbitmq.qpid.protonj2.client.futures.ClientSynchronization
            public void onPendingFailure(Throwable th) {
                ClientStreamSender.this.handleCreditStateUpdated(ClientStreamSender.this.protonLink());
            }
        });
        this.executor.execute(() -> {
            if (outgoingDelivery.getTransferCount() == 0) {
                outgoingDelivery.abort();
                createFuture.complete(clientStreamTracker);
                return;
            }
            ClientOutgoingEnvelope abort = new ClientOutgoingEnvelope(this, outgoingDelivery, outgoingDelivery.getMessageFormat(), null, false, createFuture).abort();
            try {
                if (this.protonSender.isSendable() && (this.protonSender.current() == null || this.protonSender.current() == outgoingDelivery)) {
                    abort.send(outgoingDelivery.getState(), outgoingDelivery.isSettled());
                } else if (this.protonSender.current() == outgoingDelivery) {
                    addToHeadOfBlockedQueue(abort);
                } else {
                    addToTailOfBlockedQueue(abort);
                }
            } catch (Exception e) {
                createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
            }
        });
        this.session.request(this, createFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void complete(OutgoingDelivery outgoingDelivery, ClientStreamTracker clientStreamTracker) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() { // from class: com.rabbitmq.qpid.protonj2.client.impl.ClientStreamSender.2
            @Override // com.rabbitmq.qpid.protonj2.client.futures.ClientSynchronization
            public void onPendingSuccess(StreamTracker streamTracker) {
                ClientStreamSender.this.handleCreditStateUpdated(ClientStreamSender.this.protonLink());
            }

            @Override // com.rabbitmq.qpid.protonj2.client.futures.ClientSynchronization
            public void onPendingFailure(Throwable th) {
                ClientStreamSender.this.handleCreditStateUpdated(ClientStreamSender.this.protonLink());
            }
        });
        this.executor.execute(() -> {
            ClientOutgoingEnvelope clientOutgoingEnvelope = new ClientOutgoingEnvelope(this, outgoingDelivery, outgoingDelivery.getMessageFormat(), null, true, createFuture);
            try {
                if (this.protonSender.isSendable() && (this.protonSender.current() == null || this.protonSender.current() == outgoingDelivery)) {
                    clientOutgoingEnvelope.send(outgoingDelivery.getState(), outgoingDelivery.isSettled());
                } else if (this.protonSender.current() == outgoingDelivery) {
                    addToHeadOfBlockedQueue(clientOutgoingEnvelope);
                } else {
                    addToTailOfBlockedQueue(clientOutgoingEnvelope);
                }
            } catch (Exception e) {
                createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
            }
        });
        this.session.request(this, createFuture);
    }

    private void handleCreditStateUpdated(Sender sender) {
        if (!this.blocked.isEmpty()) {
            while (sender.isSendable() && !this.blocked.isEmpty()) {
                ClientOutgoingEnvelope peek = this.blocked.peek();
                if (peek.delivery() != this.protonSender.current()) {
                    break;
                }
                LOG.trace("Dispatching previously held send");
                try {
                    try {
                        this.session.getTransactionContext().send(peek, null, isSendingSettled());
                        this.blocked.poll();
                    } catch (Exception e) {
                        peek.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                        this.blocked.poll();
                    }
                } catch (Throwable th) {
                    this.blocked.poll();
                    throw th;
                }
            }
        }
        if (sender.isDraining() && sender.current() == null && this.blocked.isEmpty()) {
            sender.drained();
        }
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void linkSpecificLocalOpenHandler() {
        this.protonSender.creditStateUpdateHandler(this::handleCreditStateUpdated);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void recreateLinkForReconnect() {
        this.protonSender.localCloseHandler(null);
        this.protonSender.localDetachHandler(null);
        this.protonSender.close2();
        if (this.protonSender.hasUnsettled()) {
            failPendingUnsettledAndBlockedSends(new ClientConnectionRemotelyClosedException("Connection failed and send result is unknown"));
        }
        this.protonSender = ClientSenderBuilder.recreateSender(this.session, this.protonSender, this.options);
        this.protonSender.setLinkedResource(this);
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void linkSpecificCleanupHandler(ClientException clientException) {
        if (this.session instanceof ClientStreamSession) {
            this.session.closeAsync();
        }
        if (clientException != null) {
            failPendingUnsettledAndBlockedSends(clientException);
        } else {
            failPendingUnsettledAndBlockedSends(new ClientResourceRemotelyClosedException("The sender link has closed"));
        }
    }

    private void failPendingUnsettledAndBlockedSends(ClientException clientException) {
        this.protonSender.unsettled().forEach(outgoingDelivery -> {
            try {
                ((ClientTrackable) outgoingDelivery.getLinkedResource(ClientTrackable.class)).settlementFuture().failed(clientException);
            } catch (Exception e) {
            }
        });
        this.blocked.removeIf(clientOutgoingEnvelope -> {
            clientOutgoingEnvelope.failed(clientException);
            return true;
        });
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void linkSpecificLocalCloseHandler() {
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void linkSpecificRemoteOpenHandler() {
    }

    @Override // com.rabbitmq.qpid.protonj2.client.impl.ClientLinkType
    protected void linkSpecificRemoteCloseHandler() {
    }

    @Override // com.rabbitmq.qpid.protonj2.client.StreamSender
    public /* bridge */ /* synthetic */ StreamSenderMessage beginMessage(Map map) throws ClientException {
        return beginMessage((Map<String, Object>) map);
    }
}
