package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.service.exceptions.IotHubException;
import com.microsoft.azure.sdk.iot.service.messaging.Message;
import com.microsoft.azure.sdk.iot.service.messaging.SendResult;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/sdk/iot/service/transport/amqps/CloudToDeviceMessageSenderLinkHandler.class */
public class CloudToDeviceMessageSenderLinkHandler extends SenderLinkHandler {
    private static final Logger log = LoggerFactory.getLogger(CloudToDeviceMessageSenderLinkHandler.class);
    private final Queue<CloudToDeviceMessage> outgoingMessageQueue;
    private final Map<Integer, CloudToDeviceMessage> unacknowledgedMessages;

    public CloudToDeviceMessageSenderLinkHandler(Sender sender, String str, LinkStateCallback linkStateCallback) {
        super(sender, str, linkStateCallback);
        this.outgoingMessageQueue = new ConcurrentLinkedQueue();
        this.unacknowledgedMessages = new ConcurrentHashMap();
    }

    public void sendAsync(String str, String str2, Message message, Consumer<SendResult> consumer, Object obj) {
        if (str2 == null) {
            log.trace("Queueing cloud to device message with correlation id {}", message.getCorrelationId());
        } else {
            log.trace("Queueing cloud to module message with correlation id {}", message.getCorrelationId());
        }
        this.outgoingMessageQueue.add(new CloudToDeviceMessage(str, str2, message, consumer, obj));
    }

    public void onLinkFlow(Event event) {
        event.getReactor().schedule(200, this);
    }

    public void onTimerTask(Event event) {
        sendQueuedMessages();
        event.getReactor().schedule(200, this);
    }

    private void sendQueuedMessages() {
        CloudToDeviceMessage poll = this.outgoingMessageQueue.poll();
        while (true) {
            CloudToDeviceMessage cloudToDeviceMessage = poll;
            if (cloudToDeviceMessage == null) {
                return;
            }
            this.unacknowledgedMessages.put(Integer.valueOf(sendMessageAndGetDeliveryTag(cloudToDeviceMessage.getProtonMessage())), cloudToDeviceMessage);
            poll = this.outgoingMessageQueue.poll();
        }
    }

    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        Rejected remoteState = delivery.getRemoteState();
        CloudToDeviceMessage remove = this.unacknowledgedMessages.remove(Integer.valueOf(Integer.parseInt(new String(delivery.getTag(), StandardCharsets.UTF_8))));
        if (remove != null) {
            String correlationId = remove.getCorrelationId();
            log.trace("Acknowledgement arrived for sent cloud to device message with correlation id {}", correlationId);
            IotHubException iotHubException = null;
            if (remoteState instanceof Rejected) {
                iotHubException = new ProtonJExceptionParser(remoteState.getError().getCondition().toString(), remoteState.getError().getDescription()).getIotHubException();
            }
            Consumer<SendResult> onMessageSentCallback = remove.getOnMessageSentCallback();
            if (onMessageSentCallback != null) {
                onMessageSentCallback.accept(new SendResult(iotHubException == null, correlationId, remove.getOnMessageSentCallbackContext(), iotHubException));
            }
        } else {
            log.debug("Received an acknowledgement for a cloud to device message that this client did not send");
        }
        delivery.settle();
    }

    public void onConnectionRemoteClose(Event event) {
        super.onConnectionRemoteClose(event);
        event.getTransport().close_tail();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public void close() {
        super.close();
        for (CloudToDeviceMessage cloudToDeviceMessage : this.outgoingMessageQueue) {
            IotHubException iotHubException = new IotHubException("Message failed to send because the client was closed while it was still queued.");
            Consumer<SendResult> onMessageSentCallback = cloudToDeviceMessage.getOnMessageSentCallback();
            if (onMessageSentCallback != null) {
                onMessageSentCallback.accept(new SendResult(false, cloudToDeviceMessage.getCorrelationId(), cloudToDeviceMessage.getOnMessageSentCallbackContext(), iotHubException));
            }
        }
        for (CloudToDeviceMessage cloudToDeviceMessage2 : this.unacknowledgedMessages.values()) {
            IotHubException iotHubException2 = new IotHubException("Message failed to send because the client was closed after it was sent, but before it was acknowledged by the service.");
            Consumer<SendResult> onMessageSentCallback2 = cloudToDeviceMessage2.getOnMessageSentCallback();
            if (onMessageSentCallback2 != null) {
                onMessageSentCallback2.accept(new SendResult(false, cloudToDeviceMessage2.getCorrelationId(), cloudToDeviceMessage2.getOnMessageSentCallbackContext(), iotHubException2));
            }
        }
        this.outgoingMessageQueue.clear();
        this.unacknowledgedMessages.clear();
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    protected String getLinkInstanceType() {
        return "cloudToDeviceSender";
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public /* bridge */ /* synthetic */ void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteClose(Event event) {
        super.onLinkRemoteClose(event);
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public /* bridge */ /* synthetic */ void onLinkInit(Event event) {
        super.onLinkInit(event);
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public /* bridge */ /* synthetic */ void onLinkLocalOpen(Event event) {
        super.onLinkLocalOpen(event);
    }

    @Override // com.microsoft.azure.sdk.iot.service.transport.amqps.SenderLinkHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteOpen(Event event) {
        super.onLinkRemoteOpen(event);
    }
}
