package com.microsoft.azure.sdk.iot.service.transport.amqps;

import com.microsoft.azure.sdk.iot.service.transport.TransportUtils;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.apache.qpid.proton.reactor.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/sdk/iot/service/transport/amqps/SenderLinkHandler.class */
public abstract class SenderLinkHandler extends BaseHandler {
    private static final Logger log = LoggerFactory.getLogger(SenderLinkHandler.class);
    private static final String API_VERSION_KEY = "com.microsoft:api-version";
    String senderLinkTag;
    final String linkCorrelationId;
    String senderLinkAddress;
    final Sender senderLink;
    protected final LinkStateCallback linkStateCallback;
    private long nextTag = 0;
    final Map<Symbol, Object> amqpProperties = new HashMap();

    protected abstract String getLinkInstanceType();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SenderLinkHandler(Sender sender, String str, LinkStateCallback linkStateCallback) {
        this.amqpProperties.put(Symbol.getSymbol(API_VERSION_KEY), TransportUtils.IOTHUB_API_VERSION);
        this.linkCorrelationId = str;
        this.senderLink = sender;
        this.linkStateCallback = linkStateCallback;
        BaseHandler.setHandler(sender, this);
        add(new FlowController());
    }

    public void onLinkRemoteOpen(Event event) {
        log.debug("{} sender link with link correlation id {} was successfully opened", getLinkInstanceType(), this.linkCorrelationId);
        this.linkStateCallback.onSenderLinkRemoteOpen();
    }

    public void onLinkLocalOpen(Event event) {
        log.trace("{} sender link with link correlation id {} opened locally", getLinkInstanceType(), this.linkCorrelationId);
    }

    public void onLinkInit(Event event) {
        Link link = event.getLink();
        Target target = new Target();
        target.setAddress(this.senderLinkAddress);
        link.setTarget(target);
        link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        link.setProperties(this.amqpProperties);
        link.open();
        log.trace("Opening {} sender link with correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
    }

    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        if (link.getLocalState() == EndpointState.ACTIVE) {
            log.debug("{} sender link with link correlation id {} was closed remotely unexpectedly", getLinkInstanceType(), this.linkCorrelationId);
            link.close();
        } else {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.linkCorrelationId);
            event.getSession().close();
        }
    }

    public void onLinkLocalClose(Event event) {
        if (event.getLink().getRemoteState() != EndpointState.CLOSED) {
            log.trace("{} sender link with correlation id {} was closed locally", getLinkInstanceType(), this.linkCorrelationId);
        } else {
            log.trace("Closing amqp session now that this {} sender link with link correlation id {} has closed remotely and locally", getLinkInstanceType(), this.linkCorrelationId);
            event.getSession().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.senderLink.getLocalState() != EndpointState.CLOSED) {
            log.debug("Closing {} sender link with link correlation id {}", getLinkInstanceType(), this.linkCorrelationId);
            this.senderLink.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int sendMessageAndGetDeliveryTag(MessageImpl messageImpl) {
        byte[] bArr;
        int encode;
        if (this.nextTag == 2147483647L || this.nextTag < 0) {
            this.nextTag = 0L;
        } else {
            this.nextTag++;
        }
        byte[] bArr2 = new byte[1024];
        while (true) {
            try {
                bArr = bArr2;
                encode = messageImpl.encode(bArr, 0, bArr.length);
                break;
            } catch (BufferOverflowException e) {
                bArr2 = new byte[bArr.length * 2];
            }
        }
        byte[] bytes = String.valueOf(this.nextTag).getBytes();
        Delivery delivery = this.senderLink.delivery(bytes);
        try {
            log.trace("Sending {} bytes over the amqp {} sender link with link correlation id {}", new Object[]{Integer.valueOf(encode), getLinkInstanceType(), this.linkCorrelationId});
            int send = this.senderLink.send(bArr, 0, encode);
            log.trace("{} bytes sent over the amqp {} sender link with link correlation id {}", new Object[]{Integer.valueOf(send), getLinkInstanceType(), this.linkCorrelationId});
            if (send != encode) {
                throw new IOException(String.format("Amqp send operation did not send all of the expected bytes for %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
            }
            if (!this.senderLink.advance()) {
                throw new IOException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
            }
            log.trace("Message was sent over {} sender link with delivery tag {} and hash {}", new Object[]{getLinkInstanceType(), new String(bytes), Integer.valueOf(delivery.hashCode())});
            return Integer.parseInt(new String(bytes));
        } catch (Exception e2) {
            log.warn("Encountered a problem while sending a message on {} sender link with link correlation id {}", new Object[]{getLinkInstanceType(), this.linkCorrelationId, e2});
            this.senderLink.advance();
            delivery.free();
            return -1;
        }
    }
}
