package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageProperty;
import com.microsoft.azure.sdk.iot.device.MessageType;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.TransportUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.reactor.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsReceiverLinkHandler.class */
public abstract class AmqpsReceiverLinkHandler extends BaseHandler {
    private static final Logger log = LoggerFactory.getLogger(AmqpsReceiverLinkHandler.class);
    static final String VERSION_IDENTIFIER_KEY = "com.microsoft:client-version";
    private static final String API_VERSION_KEY = "com.microsoft:api-version";
    private static final String TO_KEY = "to";
    private static final String USER_ID_KEY = "userId";
    private static final String AMQPS_APP_PROPERTY_PREFIX = "iothub-app-";
    private final Map<Message, AmqpsMessage> receivedMessagesMap = new ConcurrentHashMap();
    Map<Symbol, Object> amqpProperties = new HashMap();
    String receiverLinkTag;
    String linkCorrelationId;
    String receiverLinkAddress;
    Receiver receiverLink;
    private AmqpsLinkStateCallback amqpsLinkStateCallback;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpsReceiverLinkHandler(Receiver receiver, AmqpsLinkStateCallback amqpsLinkStateCallback, String str) {
        this.amqpProperties.put(Symbol.getSymbol(API_VERSION_KEY), TransportUtils.IOTHUB_API_VERSION);
        this.receiverLink = receiver;
        this.linkCorrelationId = str;
        this.amqpsLinkStateCallback = amqpsLinkStateCallback;
        BaseHandler.setHandler(receiver, this);
        add(new FlowController());
    }

    public void onLinkRemoteOpen(Event event) {
        log.debug("{} receiver link with link correlation id {} was successfully opened", getLinkInstanceType(), this.linkCorrelationId);
        this.amqpsLinkStateCallback.onLinkOpened(this);
    }

    public void onLinkLocalOpen(Event event) {
        log.trace("{} receiver link with link correlation id {} opened locally", getLinkInstanceType(), this.linkCorrelationId);
    }

    public void onDelivery(Event event) {
        AmqpsMessage messageFromReceiverLink = getMessageFromReceiverLink((Receiver) event.getLink());
        IotHubTransportMessage protonMessageToIoTHubMessage = protonMessageToIoTHubMessage(messageFromReceiverLink);
        this.receivedMessagesMap.put(protonMessageToIoTHubMessage, messageFromReceiverLink);
        this.amqpsLinkStateCallback.onMessageReceived(protonMessageToIoTHubMessage);
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Source source = new Source();
        source.setAddress(this.receiverLinkAddress);
        link.setSource(source);
        link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        link.setProperties(this.amqpProperties);
        link.open();
        log.trace("Opening {} receiver link with correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
    }

    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        if (link.getLocalState() != EndpointState.ACTIVE) {
            log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.linkCorrelationId);
            event.getSession().close();
        } else {
            log.debug("{} receiver link with link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.linkCorrelationId);
            link.close();
            this.amqpsLinkStateCallback.onLinkClosedUnexpectedly(link.getRemoteCondition());
        }
    }

    public void onLinkLocalClose(Event event) {
        if (event.getLink().getRemoteState() != EndpointState.CLOSED) {
            log.trace("{} receiver link with correlation id {} was closed locally", getLinkInstanceType(), this.linkCorrelationId);
        } else {
            log.trace("Closing amqp session now that this {} receiver link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.linkCorrelationId);
            event.getSession().close();
        }
    }

    public boolean acknowledgeReceivedMessage(IotHubTransportMessage iotHubTransportMessage, DeliveryState deliveryState) {
        if (!this.receivedMessagesMap.containsKey(iotHubTransportMessage)) {
            return false;
        }
        this.receivedMessagesMap.remove(iotHubTransportMessage).acknowledge(deliveryState);
        return true;
    }

    abstract String getLinkInstanceType();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpsMessage getMessageFromReceiverLink(Receiver receiver) {
        Delivery current = receiver.current();
        if (current == null || !current.isReadable() || current.isPartial()) {
            return null;
        }
        int pending = current.pending();
        byte[] bArr = new byte[pending];
        int recv = receiver.recv(bArr, 0, bArr.length);
        log.trace("read {} bytes from receiver link {}", Integer.valueOf(recv), receiver.getName());
        if (!receiver.advance()) {
            log.warn("{} receiver link with link correlation id {} did not advance after bytes were read from it", getLinkInstanceType(), this.linkCorrelationId);
        }
        if (pending != recv) {
            log.warn("Amqp read from {} receiver link with link correlation id {} did not read the expected amount of bytes. Read {} but expected {}", new Object[]{getLinkInstanceType(), this.linkCorrelationId, Integer.valueOf(recv), Integer.valueOf(pending)});
        }
        AmqpsMessage amqpsMessage = new AmqpsMessage();
        amqpsMessage.setDelivery(current);
        amqpsMessage.decode(bArr, 0, recv);
        return amqpsMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IotHubTransportMessage protonMessageToIoTHubMessage(AmqpsMessage amqpsMessage) {
        byte[] bArr;
        log.trace("Converting proton message to iot hub message for {} receiver link with link correlation id {}. Proton message correlation id {}", new Object[]{getLinkInstanceType(), this.linkCorrelationId, amqpsMessage.getCorrelationId()});
        Data body = amqpsMessage.getBody();
        if (body != null) {
            Binary value = body.getValue();
            bArr = new byte[value.getLength()];
            value.asByteBuffer().get(bArr);
        } else {
            bArr = new byte[0];
        }
        IotHubTransportMessage iotHubTransportMessage = new IotHubTransportMessage(bArr, MessageType.UNKNOWN);
        Properties properties = amqpsMessage.getProperties();
        if (properties != null) {
            if (properties.getCorrelationId() != null) {
                iotHubTransportMessage.setCorrelationId(properties.getCorrelationId().toString());
            }
            if (properties.getMessageId() != null) {
                iotHubTransportMessage.setMessageId(properties.getMessageId().toString());
            }
            if (properties.getTo() != null) {
                iotHubTransportMessage.setProperty("iothub-app-to", properties.getTo());
            }
            if (properties.getUserId() != null) {
                iotHubTransportMessage.setProperty("iothub-app-userId", properties.getUserId().toString());
            }
            if (properties.getContentEncoding() != null) {
                iotHubTransportMessage.setContentEncoding(properties.getContentEncoding().toString());
            }
            if (properties.getContentType() != null) {
                iotHubTransportMessage.setContentType(properties.getContentType().toString());
            }
        }
        if (amqpsMessage.getApplicationProperties() != null) {
            for (Map.Entry entry : amqpsMessage.getApplicationProperties().getValue().entrySet()) {
                String str = (String) entry.getKey();
                if (str.equalsIgnoreCase(MessageProperty.CONNECTION_DEVICE_ID)) {
                    iotHubTransportMessage.setConnectionDeviceId(entry.getValue().toString());
                } else if (str.equalsIgnoreCase(MessageProperty.CONNECTION_MODULE_ID)) {
                    iotHubTransportMessage.setConnectionModuleId(entry.getValue().toString());
                } else if (!MessageProperty.RESERVED_PROPERTY_NAMES.contains(str)) {
                    iotHubTransportMessage.setProperty((String) entry.getKey(), entry.getValue().toString());
                }
            }
        }
        return iotHubTransportMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.receiverLink.getLocalState() != EndpointState.CLOSED) {
            log.debug("Closing {} receiver link with link correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
            this.receiverLink.close();
        }
    }
}
