package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.proton.transport.proxy.ProxyAuthenticationType;
import com.microsoft.azure.proton.transport.proxy.ProxyConfiguration;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
import com.microsoft.azure.proton.transport.ws.WebSocketHandler;
import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import com.microsoft.azure.sdk.iot.device.ClientConfiguration;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.ProxySettings;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSSLContext;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.device.transport.IotHubListener;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection;
import com.microsoft.azure.sdk.iot.device.transport.IotHubTransportMessage;
import com.microsoft.azure.sdk.iot.device.transport.MultiplexingDeviceUnauthorizedException;
import com.microsoft.azure.sdk.iot.device.transport.ProtocolException;
import com.microsoft.azure.sdk.iot.device.transport.TransportException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.class */
public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTransportConnection, AmqpsSessionStateCallback, ReactorRunnerStateCallback {
    private static final Logger log = LoggerFactory.getLogger(AmqpsIotHubConnection.class);
    private static final int MAX_WAIT_TO_CLOSE_CONNECTION = 20000;
    private static final String WEB_SOCKET_PATH = "/$iothub/websocket";
    private static final String WEB_SOCKET_SUB_PROTOCOL = "AMQPWSB10";
    private static final String WEB_SOCKET_QUERY = "iothub-no-client-cert=true";
    private static final int MAX_MESSAGE_PAYLOAD_SIZE = 262144;
    private static final int MAX_FRAME_SIZE = 4096;
    private static final int WEB_SOCKET_PORT = 443;
    private static final int AMQP_PORT = 5671;
    private static final int REACTOR_COUNT = 1;
    private static final int CBS_SESSION_COUNT = 1;
    private final int sendInterval;
    private static final int MAX_MESSAGES_TO_SEND_PER_CALLBACK = 1000;
    private final Queue<Message> messagesToSend;
    private String connectionId;
    private IotHubConnectionStatus state;
    private final String hostName;
    private SSLContext sslContext;
    private final boolean isWebsocketConnection;
    private final ClientConfiguration.AuthType authenticationType;
    private final Set<ClientConfiguration> clientConfigurations;
    private IotHubListener listener;
    private TransportException savedException;
    private final Object executorServiceLock;
    private ExecutorService executorService;
    private final ProxySettings proxySettings;
    private final String transportUniqueIdentifier;
    private CountDownLatch authenticationSessionOpenedLatch;
    private Map<String, CountDownLatch> deviceSessionsOpenedLatches;
    private CountDownLatch closeReactorLatch;
    private Connection connection;
    private Reactor reactor;
    private final Map<String, AmqpsSessionHandler> reconnectingDeviceSessionHandlers;
    private final Map<String, AmqpsSessionHandler> sessionHandlers;
    private final Queue<AmqpsSasTokenRenewalHandler> sasTokenRenewalHandlers;
    private AmqpsCbsSessionHandler amqpsCbsSessionHandler;
    private final Set<ClientConfiguration> multiplexingClientsToRegister;
    private final Map<ClientConfiguration, Boolean> multiplexingClientsToUnregister;
    private ClientConfiguration clientConfiguration;
    private final boolean isMultiplexing;
    private final int keepAliveInterval;
    private final Map<IotHubTransportMessage, IotHubMessageResult> queuedAcknowledgements;
    private final String threadNamePrefix;
    private final String threadNameSuffix;
    private final boolean useIdentifiableThreadNames;

    public AmqpsIotHubConnection(ClientConfiguration clientConfiguration, String str) {
        this.messagesToSend = new ConcurrentLinkedQueue();
        this.executorServiceLock = new Object();
        this.reconnectingDeviceSessionHandlers = new ConcurrentHashMap();
        this.sessionHandlers = new ConcurrentHashMap();
        this.sasTokenRenewalHandlers = new ConcurrentLinkedQueue();
        this.clientConfiguration = null;
        this.queuedAcknowledgements = new ConcurrentHashMap();
        this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToUnregister = new ConcurrentHashMap();
        this.clientConfigurations.add(clientConfiguration);
        this.clientConfiguration = clientConfiguration;
        this.transportUniqueIdentifier = str;
        this.isWebsocketConnection = clientConfiguration.isUsingWebsocket();
        this.authenticationType = clientConfiguration.getAuthenticationType();
        this.proxySettings = clientConfiguration.getProxySettings();
        String gatewayHostname = clientConfiguration.getGatewayHostname();
        if (gatewayHostname == null || gatewayHostname.isEmpty()) {
            log.trace("No gateway hostname was present in config, connecting directly to hub");
            this.hostName = clientConfiguration.getIotHubHostname();
        } else {
            log.debug("Gateway hostname was present in config, connecting to gateway rather than directly to hub");
            this.hostName = gatewayHostname;
        }
        add(new Handshaker());
        this.isMultiplexing = false;
        this.keepAliveInterval = clientConfiguration.getKeepAliveInterval();
        this.sendInterval = this.clientConfiguration.getSendInterval();
        this.useIdentifiableThreadNames = this.clientConfiguration.isUsingIdentifiableThreadNames();
        this.threadNamePrefix = this.clientConfiguration.getThreadNamePrefix();
        this.threadNameSuffix = this.clientConfiguration.getThreadNameSuffix();
        this.state = IotHubConnectionStatus.DISCONNECTED;
        log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", Integer.valueOf(this.isWebsocketConnection ? WEB_SOCKET_PORT : AMQP_PORT));
    }

    public AmqpsIotHubConnection(String str, String str2, boolean z, SSLContext sSLContext, ProxySettings proxySettings, int i, int i2, boolean z2, String str3, String str4) {
        this.messagesToSend = new ConcurrentLinkedQueue();
        this.executorServiceLock = new Object();
        this.reconnectingDeviceSessionHandlers = new ConcurrentHashMap();
        this.sessionHandlers = new ConcurrentHashMap();
        this.sasTokenRenewalHandlers = new ConcurrentLinkedQueue();
        this.clientConfiguration = null;
        this.queuedAcknowledgements = new ConcurrentHashMap();
        this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap());
        this.multiplexingClientsToUnregister = new ConcurrentHashMap();
        this.isWebsocketConnection = z;
        this.authenticationType = ClientConfiguration.AuthType.SAS_TOKEN;
        this.hostName = str;
        this.transportUniqueIdentifier = str2;
        this.proxySettings = proxySettings;
        this.sslContext = sSLContext;
        add(new Handshaker());
        this.isMultiplexing = true;
        this.state = IotHubConnectionStatus.DISCONNECTED;
        log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", Integer.valueOf(this.isWebsocketConnection ? WEB_SOCKET_PORT : AMQP_PORT));
        this.keepAliveInterval = i;
        this.sendInterval = i2;
        this.useIdentifiableThreadNames = z2;
        this.threadNamePrefix = str3;
        this.threadNameSuffix = str4;
    }

    public void registerMultiplexedDevice(ClientConfiguration clientConfiguration) {
        if (this.state == IotHubConnectionStatus.CONNECTED) {
            log.trace("Queuing the registration of device {} to an active multiplexed connection", clientConfiguration.getDeviceId());
            this.deviceSessionsOpenedLatches.put(clientConfiguration.getDeviceId(), new CountDownLatch(1));
            this.multiplexingClientsToRegister.add(clientConfiguration);
        }
        this.clientConfigurations.add(clientConfiguration);
    }

    public void unregisterMultiplexedDevice(ClientConfiguration clientConfiguration, boolean z) {
        if (this.state == IotHubConnectionStatus.CONNECTED) {
            if (z) {
                log.trace("Queuing the unregistration of device {} from an active multiplexed connection. The device will be re-registered for reconnection purposes.", clientConfiguration.getDeviceId());
            } else {
                log.trace("Queuing the unregistration of device {} from an active multiplexed connection", clientConfiguration.getDeviceId());
            }
            this.multiplexingClientsToUnregister.put(clientConfiguration, Boolean.valueOf(z));
        }
        this.clientConfigurations.remove(clientConfiguration);
        this.deviceSessionsOpenedLatches.remove(clientConfiguration.getDeviceId());
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public void open() throws TransportException {
        log.debug("Opening amqp layer...");
        this.connectionId = UUID.randomUUID().toString();
        this.savedException = null;
        if (this.state == IotHubConnectionStatus.DISCONNECTED) {
            Iterator<ClientConfiguration> it = this.clientConfigurations.iterator();
            while (it.hasNext()) {
                addSessionHandler(it.next());
            }
            initializeStateLatches();
            try {
                openAsync();
                if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
                    log.trace("Waiting for authentication links to open...");
                }
                Iterator<ClientConfiguration> it2 = this.clientConfigurations.iterator();
                ClientConfiguration next = it2.hasNext() ? it2.next() : null;
                boolean z = !this.authenticationSessionOpenedLatch.await((long) (next != null ? next.getAmqpOpenAuthenticationSessionTimeout() : 20), TimeUnit.SECONDS);
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (z) {
                    closeConnectionWithException("Timed out waiting for authentication session to open", true);
                }
                log.trace("Waiting for device sessions to open...");
                boolean z2 = false;
                for (ClientConfiguration clientConfiguration : this.clientConfigurations) {
                    z2 = !this.deviceSessionsOpenedLatches.get(clientConfiguration.getDeviceId()).await((long) clientConfiguration.getAmqpOpenDeviceSessionsTimeout(), TimeUnit.SECONDS);
                    if (z2) {
                        break;
                    }
                }
                if (this.savedException != null) {
                    throw this.savedException;
                }
                if (z2) {
                    closeConnectionWithException("Timed out waiting for worker links to open", true);
                }
            } catch (TransportException e) {
                clearLocalState();
                closeNetworkResources();
                throw e;
            } catch (InterruptedException e2) {
                clearLocalState();
                closeNetworkResources();
                TransportException transportException = new TransportException("Interrupted while waiting for links to open for AMQP connection", e2);
                transportException.setRetryable(true);
                throw transportException;
            }
        }
        this.state = IotHubConnectionStatus.CONNECTED;
        this.listener.onConnectionEstablished(this.connectionId);
        log.debug("Amqp connection opened successfully");
    }

    private void clearLocalState() {
        this.sessionHandlers.clear();
        this.sasTokenRenewalHandlers.clear();
    }

    private void closeNetworkResources() {
        try {
            this.reactor.free();
        } catch (IllegalStateException e) {
            log.trace("Failed to free the reactor. Moving forward with cleanup anyways.", e);
        }
        executorServicesCleanup();
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public void close() {
        log.debug("Shutting down amqp layer...");
        try {
            closeAsync();
            try {
                log.trace("Waiting for reactor to close...");
                this.closeReactorLatch.await(20000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.warn("Interrupted while waiting on reactor to close gracefully. Forcefully closing the reactor now.", e);
            }
        } finally {
            clearLocalState();
            closeNetworkResources();
            this.state = IotHubConnectionStatus.DISCONNECTED;
            log.trace("Amqp connection closed successfully");
        }
    }

    public void onReactorInit(Event event) {
        this.reactor = event.getReactor();
        String str = this.hostName;
        int i = AMQP_PORT;
        if (this.isWebsocketConnection) {
            if (this.proxySettings != null) {
                str = this.proxySettings.getHostname();
                i = this.proxySettings.getPort();
            } else {
                i = WEB_SOCKET_PORT;
            }
        }
        this.reactor.connectionToHost(str, i, this);
        this.reactor.schedule(this.sendInterval, this);
    }

    public void onReactorFinal(Event event) {
        log.trace("Amqps reactor finalized");
        releaseLatch(this.authenticationSessionOpenedLatch);
        releaseDeviceSessionLatches();
        releaseLatch(this.closeReactorLatch);
        if (this.savedException != null) {
            this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
            this.listener.onConnectionLost(this.savedException, this.connectionId);
        }
    }

    public void onConnectionInit(Event event) {
        this.connection = event.getConnection();
        this.connection.setHostname(this.hostName);
        this.connection.open();
    }

    public void onConnectionBound(Event event) {
        Transport transport = event.getTransport();
        transport.setIdleTimeout(this.keepAliveInterval * 1000);
        if (this.isWebsocketConnection) {
            addWebSocketLayer(transport);
        }
        try {
            Iterator<ClientConfiguration> it = this.clientConfigurations.iterator();
            ClientConfiguration next = it.hasNext() ? it.next() : null;
            SSLContext sSLContext = next != null ? next.getAuthenticationProvider().getSSLContext() : this.sslContext != null ? this.sslContext : new IotHubSSLContext().getSSLContext();
            if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
                transport.sasl().setMechanisms(new String[]{"ANONYMOUS"});
            }
            SslDomain sslDomain = Proton.sslDomain();
            sslDomain.setSslContext(sSLContext);
            sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            sslDomain.init(SslDomain.Mode.CLIENT);
            transport.ssl(sslDomain);
        } catch (IOException e) {
            this.savedException = new TransportException(e);
            log.error("Encountered an exception while setting ssl domain for the amqp connection", this.savedException);
        }
        if (this.proxySettings != null) {
            addProxyLayer(transport, event.getConnection().getHostname() + ":" + WEB_SOCKET_PORT);
        }
    }

    public void onConnectionLocalOpen(Event event) {
        log.trace("Amqp connection opened locally");
        if (this.authenticationType != ClientConfiguration.AuthType.SAS_TOKEN) {
            this.sessionHandlers.values().iterator().next().setSession(this.connection.session());
            return;
        }
        this.amqpsCbsSessionHandler = new AmqpsCbsSessionHandler(this.connection.session(), this);
        Iterator<AmqpsSasTokenRenewalHandler> it = this.sasTokenRenewalHandlers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.sasTokenRenewalHandlers.clear();
        for (AmqpsSessionHandler amqpsSessionHandler : this.sessionHandlers.values()) {
            amqpsSessionHandler.setSession(this.connection.session());
            this.sasTokenRenewalHandlers.add(new AmqpsSasTokenRenewalHandler(this.amqpsCbsSessionHandler, amqpsSessionHandler));
        }
    }

    public void onConnectionRemoteOpen(Event event) {
        log.trace("Amqp connection opened remotely");
    }

    public void onConnectionLocalClose(Event event) {
        log.debug("Amqp connection closed locally, shutting down all active sessions...");
        Iterator<AmqpsSessionHandler> it = this.sessionHandlers.values().iterator();
        while (it.hasNext()) {
            it.next().closeSession();
        }
        if (this.amqpsCbsSessionHandler != null) {
            log.debug("Shutting down cbs session...");
            this.amqpsCbsSessionHandler.close();
        }
        log.trace("Closing reactor since connection has closed");
        event.getReactor().stop();
    }

    public void onConnectionRemoteClose(Event event) {
        Connection connection = event.getConnection();
        if (connection.getLocalState() != EndpointState.ACTIVE) {
            log.trace("Closing reactor since connection has closed");
            event.getReactor().stop();
        } else {
            this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(connection.getRemoteCondition());
            log.error("Amqp connection was closed remotely", this.savedException);
            this.connection.close();
        }
    }

    public void onTransportError(Event event) {
        super.onTransportError(event);
        this.state = IotHubConnectionStatus.DISCONNECTED;
        ErrorCondition remoteCondition = event.getTransport().getRemoteCondition();
        boolean z = false;
        if (remoteCondition == null || (remoteCondition.getCondition() == null && remoteCondition.getDescription() == null && remoteCondition.getInfo() == null)) {
            remoteCondition = event.getTransport().getCondition();
            z = true;
        }
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(remoteCondition);
        this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
        if (event.getConnection().getLocalState() == EndpointState.CLOSED && z) {
            log.error("Amqp transport error occurred, calling onConnectionLocalClose", this.savedException);
            onConnectionLocalClose(event);
        } else {
            log.error("Amqp transport error occurred, closing the AMQPS connection", this.savedException);
            event.getConnection().close();
        }
    }

    public void onTimerTask(Event event) {
        sendQueuedMessages();
        sendQueuedAcknowledgements();
        checkForNewlyUnregisteredMultiplexedClientsToStop();
        checkForNewlyRegisteredMultiplexedClientsToStart();
        event.getReactor().schedule(this.sendInterval, this);
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public void setListener(IotHubListener iotHubListener) {
        this.listener = iotHubListener;
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public IotHubStatusCode sendMessage(Message message) {
        log.trace("Adding message to amqp message queue to be sent later ({})", message);
        this.messagesToSend.add(message);
        return IotHubStatusCode.OK;
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public boolean sendMessageResult(IotHubTransportMessage iotHubTransportMessage, IotHubMessageResult iotHubMessageResult) {
        this.queuedAcknowledgements.put(iotHubTransportMessage, iotHubMessageResult);
        return true;
    }

    private void sendQueuedAcknowledgements() {
        Accepted released;
        while (!this.queuedAcknowledgements.isEmpty()) {
            IotHubTransportMessage next = this.queuedAcknowledgements.keySet().iterator().next();
            IotHubMessageResult iotHubMessageResult = this.queuedAcknowledgements.get(next);
            this.queuedAcknowledgements.remove(next);
            if (iotHubMessageResult == IotHubMessageResult.ABANDON) {
                released = Released.getInstance();
            } else if (iotHubMessageResult == IotHubMessageResult.REJECT) {
                released = new Rejected();
            } else if (iotHubMessageResult == IotHubMessageResult.COMPLETE) {
                released = Accepted.getInstance();
            } else {
                log.warn("Invalid IoT Hub message result {}", iotHubMessageResult.name());
            }
            AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(next.getConnectionDeviceId());
            if (amqpsSessionHandler == null || !amqpsSessionHandler.acknowledgeReceivedMessage(next, released)) {
                log.warn("No sessions could acknowledge the message ({})", next);
            }
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.IotHubTransportConnection
    public String getConnectionId() {
        return this.connectionId;
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onDeviceSessionOpened(String str) {
        if (!this.deviceSessionsOpenedLatches.containsKey(str)) {
            log.warn("Unrecognized deviceId {} reported its device session as opened, ignoring it.", str);
            return;
        }
        log.trace("Device session for device {} opened, counting down the device sessions opening latch", str);
        this.deviceSessionsOpenedLatches.get(str).countDown();
        if (this.isMultiplexing) {
            this.listener.onMultiplexedDeviceSessionEstablished(this.connectionId, str);
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onAuthenticationSessionOpened() {
        log.trace("Authentication session opened, counting down the authentication session opening latch");
        this.authenticationSessionOpenedLatch.countDown();
        if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
            if (!this.isWebsocketConnection) {
                Iterator<AmqpsSasTokenRenewalHandler> it = this.sasTokenRenewalHandlers.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().sendAuthenticationMessage(this.connection.getReactor());
                    } catch (TransportException e) {
                        log.error("Failed to send CBS authentication message", e);
                        this.savedException = e;
                    }
                }
                return;
            }
            ArrayList arrayList = new ArrayList(this.sasTokenRenewalHandlers);
            for (int i = 0; i < arrayList.size() - 30; i++) {
                if (i + 30 < arrayList.size()) {
                    ((AmqpsSasTokenRenewalHandler) arrayList.get(i)).setNextToAuthenticate((AmqpsSasTokenRenewalHandler) arrayList.get(i + 30));
                }
            }
            int min = Math.min(30, arrayList.size());
            for (int i2 = 0; i2 < min; i2++) {
                try {
                    ((AmqpsSasTokenRenewalHandler) arrayList.get(i2)).sendAuthenticationMessage(this.connection.getReactor());
                } catch (TransportException e2) {
                    log.error("Failed to send CBS authentication message", e2);
                    this.savedException = e2;
                }
            }
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onMessageAcknowledged(Message message, DeliveryState deliveryState, String str) {
        if (deliveryState == Accepted.getInstance()) {
            this.listener.onMessageSent(message, str, null);
            return;
        }
        if (deliveryState instanceof Rejected) {
            this.listener.onMessageSent(message, str, AmqpsExceptionTranslator.convertFromAmqpException(((Rejected) deliveryState).getError()));
        } else {
            if (deliveryState != Released.getInstance()) {
                log.warn("Unexpected delivery state for sent message ({})", message);
                return;
            }
            ProtocolException protocolException = new ProtocolException("Message was released by the amqp server");
            protocolException.setRetryable(true);
            this.listener.onMessageSent(message, str, protocolException);
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onMessageReceived(IotHubTransportMessage iotHubTransportMessage) {
        this.listener.onMessageReceived(iotHubTransportMessage, null);
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onAuthenticationFailed(String str, TransportException transportException) {
        if (!this.isMultiplexing) {
            this.savedException = transportException;
        } else if (this.state != IotHubConnectionStatus.CONNECTED) {
            if (this.savedException == null) {
                this.savedException = new MultiplexingDeviceUnauthorizedException("One or more multiplexed devices failed to authenticate");
            }
            if (this.savedException instanceof MultiplexingDeviceUnauthorizedException) {
                ((MultiplexingDeviceUnauthorizedException) this.savedException).addRegistrationException(str, transportException);
            }
        } else {
            log.trace("Not saving the authentication failure locally. Just notifying upper layer directly.");
        }
        this.listener.onMultiplexedDeviceSessionRegistrationFailed(this.connectionId, str, transportException);
        if (this.deviceSessionsOpenedLatches.containsKey(str)) {
            this.deviceSessionsOpenedLatches.get(str).countDown();
        } else {
            log.warn("Unrecognized device Id reported authentication failure, could not map it to a device session latch", transportException);
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onSessionClosedUnexpectedly(ErrorCondition errorCondition, String str) {
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        if (this.isMultiplexing) {
            this.listener.onMultiplexedDeviceSessionRegistrationFailed(this.connectionId, str, this.savedException);
        }
        CountDownLatch countDownLatch = this.deviceSessionsOpenedLatches.get(str);
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
        if (!this.isMultiplexing) {
            log.error("Amqp session closed unexpectedly. Closing this connection...", this.savedException);
            this.connection.close();
        } else {
            log.error("Amqp session closed unexpectedly. notifying the transport layer to start reconnection logic...", this.savedException);
            this.reconnectingDeviceSessionHandlers.putAll(this.sessionHandlers);
            this.listener.onMultiplexedDeviceSessionLost(this.savedException, this.connectionId, str, this.reconnectingDeviceSessionHandlers.containsKey(str));
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onCBSSessionClosedUnexpectedly(ErrorCondition errorCondition) {
        this.savedException = AmqpsExceptionTranslator.convertFromAmqpException(errorCondition);
        log.error("Amqp CBS session closed unexpectedly. Closing this connection...", this.savedException);
        this.connection.close();
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.AmqpsSessionStateCallback
    public void onSessionClosedAsExpected(String str) {
        if (this.isMultiplexing) {
            log.trace("onSessionClosedAsExpected callback executed, notifying transport layer");
            this.listener.onMultiplexedDeviceSessionLost(this.savedException, this.connectionId, str, this.reconnectingDeviceSessionHandlers.containsKey(str));
        }
    }

    private void addWebSocketLayer(Transport transport) {
        log.debug("Adding websocket layer to amqp transport");
        WebSocketImpl webSocketImpl = new WebSocketImpl(MAX_MESSAGE_PAYLOAD_SIZE);
        webSocketImpl.configure(this.hostName, WEB_SOCKET_PATH, WEB_SOCKET_QUERY, WEB_SOCKET_PORT, WEB_SOCKET_SUB_PROTOCOL, (Map) null, (WebSocketHandler) null);
        ((TransportInternal) transport).addTransportLayer(webSocketImpl);
    }

    private void addProxyLayer(Transport transport, String str) {
        ProxyImpl proxyImpl;
        log.debug("Adding proxy layer to amqp transport");
        if (this.proxySettings.getUsername() == null || this.proxySettings.getPassword() == null) {
            log.trace("No proxy username and password will be used amqp proxy configuration");
            proxyImpl = new ProxyImpl();
        } else {
            log.trace("Adding proxy username and password to amqp proxy configuration");
            proxyImpl = new ProxyImpl(new ProxyConfiguration(ProxyAuthenticationType.BASIC, this.proxySettings.getProxy(), this.proxySettings.getUsername(), new String(this.proxySettings.getPassword())));
        }
        proxyImpl.configure(str, (Map) null, new ProxyHandlerImpl(), transport);
        ((TransportInternal) transport).addTransportLayer(proxyImpl);
    }

    private void sendQueuedMessages() {
        Message message;
        int i = 0;
        Message poll = this.messagesToSend.poll();
        while (true) {
            message = poll;
            if (message == null || i >= 1000) {
                break;
            }
            i++;
            SendResult sendQueuedMessage = sendQueuedMessage(message);
            if (sendQueuedMessage == SendResult.WRONG_DEVICE) {
                if (this.reconnectingDeviceSessionHandlers.get(message.getConnectionDeviceId()) != null) {
                    log.trace("Amqp message failed to send because its AMQP session is currently reconnecting. Adding it back to messages to send queue ({})", message);
                    TransportException transportException = new TransportException("Amqp message failed to send because its AMQP session is currently reconnecting");
                    transportException.setRetryable(true);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
                } else {
                    TransportException transportException2 = new TransportException("Message failed to send because it belonged to a device that was unregistered from the AMQP connetion");
                    transportException2.setRetryable(false);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException2);
                }
            } else if (sendQueuedMessage == SendResult.DUPLICATE_SUBSCRIPTION_MESSAGE) {
                log.trace("Attempted to send subscription message while the subscription was already in progress. Discarding the message ({})", message);
            } else if (sendQueuedMessage == SendResult.SUBSCRIPTION_IN_PROGRESS) {
                log.trace("Attempted to send twin/method message while the twin/method subscription was in progress. Adding it back to messages to send queue to try again after the subscription has finished ({})", message);
                TransportException transportException3 = new TransportException("Subscription in progress needs to be completed before this message can be sent");
                transportException3.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException3);
            } else if (sendQueuedMessage == SendResult.LINKS_NOT_OPEN) {
                log.warn("Failed to send a message because its AMQP links were not open yet. Adding it back to messages to send queue ({})", message);
                TransportException transportException4 = new TransportException("Amqp links not open for this message");
                transportException4.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException4);
            } else if (sendQueuedMessage == SendResult.UNKNOWN_FAILURE) {
                log.warn("Unknown failure occurred while attempting to send. Adding it back to messages to send queue ({})", message);
                TransportException transportException5 = new TransportException("Unknown failure");
                transportException5.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException5);
            }
            poll = this.messagesToSend.poll();
        }
        if (message != null) {
            this.messagesToSend.add(message);
        }
    }

    private SendResult sendQueuedMessage(Message message) {
        log.trace("Sending message over amqp ({})", message);
        AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(message.getConnectionDeviceId());
        return amqpsSessionHandler == null ? SendResult.WRONG_DEVICE : amqpsSessionHandler.sendMessage(message);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Reactor createReactor() throws TransportException {
        try {
            ReactorOptions reactorOptions = new ReactorOptions();
            reactorOptions.setMaxFrameSize(MAX_FRAME_SIZE);
            if (this.authenticationType == ClientConfiguration.AuthType.X509_CERTIFICATE) {
                reactorOptions.setEnableSaslByDefault(false);
            }
            return Proton.reactor(reactorOptions, new Handler[]{this});
        } catch (IOException e) {
            throw new TransportException("Could not create Proton reactor", e);
        }
    }

    private void releaseLatch(CountDownLatch countDownLatch) {
        for (int i = 0; i < countDownLatch.getCount(); i++) {
            countDownLatch.countDown();
        }
    }

    private void releaseDeviceSessionLatches() {
        Iterator<String> it = this.deviceSessionsOpenedLatches.keySet().iterator();
        while (it.hasNext()) {
            releaseLatch(this.deviceSessionsOpenedLatches.get(it.next()));
        }
    }

    private AmqpsSessionHandler addSessionHandler(ClientConfiguration clientConfiguration) {
        String deviceId = clientConfiguration.getDeviceId();
        AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(deviceId);
        if (amqpsSessionHandler != null) {
            return amqpsSessionHandler;
        }
        AmqpsSessionHandler remove = this.reconnectingDeviceSessionHandlers.containsKey(deviceId) ? this.reconnectingDeviceSessionHandlers.remove(deviceId) : new AmqpsSessionHandler(clientConfiguration, this);
        this.sessionHandlers.put(deviceId, remove);
        return remove;
    }

    private void checkForNewlyRegisteredMultiplexedClientsToStart() {
        Iterator<ClientConfiguration> it = this.multiplexingClientsToRegister.iterator();
        HashSet hashSet = new HashSet();
        for (ClientConfiguration next = it.hasNext() ? it.next() : null; next != null; next = it.hasNext() ? it.next() : null) {
            AmqpsSessionHandler addSessionHandler = addSessionHandler(next);
            log.trace("Adding device session for device {} to an active connection", next.getDeviceId());
            addSessionHandler.setSession(this.connection.session());
            AmqpsSasTokenRenewalHandler amqpsSasTokenRenewalHandler = new AmqpsSasTokenRenewalHandler(this.amqpsCbsSessionHandler, addSessionHandler);
            this.sasTokenRenewalHandlers.add(amqpsSasTokenRenewalHandler);
            try {
                amqpsSasTokenRenewalHandler.sendAuthenticationMessage(this.reactor);
                hashSet.add(next);
            } catch (TransportException e) {
                log.warn("Failed to send authentication message for device {}; will try again.", amqpsSasTokenRenewalHandler.amqpsSessionHandler.getDeviceId());
                amqpsSasTokenRenewalHandler.close();
                this.sasTokenRenewalHandlers.remove(amqpsSasTokenRenewalHandler);
                return;
            }
        }
        this.multiplexingClientsToRegister.removeAll(hashSet);
    }

    private void checkForNewlyUnregisteredMultiplexedClientsToStop() {
        Iterator<ClientConfiguration> it = this.multiplexingClientsToUnregister.keySet().iterator();
        HashSet hashSet = new HashSet();
        for (ClientConfiguration next = it.hasNext() ? it.next() : null; next != null; next = it.hasNext() ? it.next() : null) {
            String deviceId = next.getDeviceId();
            AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(deviceId);
            if (amqpsSessionHandler == null) {
                log.warn("Attempted to remove device session for device {} from multiplexed connection, but device was not currently registered.", deviceId);
            } else {
                log.trace("Removing session handler for device {}", deviceId);
                this.sessionHandlers.remove(deviceId);
                if (this.multiplexingClientsToUnregister.get(next).booleanValue()) {
                    this.reconnectingDeviceSessionHandlers.put(deviceId, amqpsSessionHandler);
                } else {
                    this.reconnectingDeviceSessionHandlers.remove(deviceId);
                }
                AmqpsSasTokenRenewalHandler amqpsSasTokenRenewalHandler = null;
                Iterator<AmqpsSasTokenRenewalHandler> it2 = this.sasTokenRenewalHandlers.iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    AmqpsSasTokenRenewalHandler next2 = it2.next();
                    if (next2.amqpsSessionHandler.getDeviceId().equals(next.getDeviceId())) {
                        amqpsSasTokenRenewalHandler = next2;
                        log.trace("Closing sas token renewal handler for device {}", next.getDeviceId());
                        amqpsSasTokenRenewalHandler.close();
                        break;
                    }
                }
                if (amqpsSasTokenRenewalHandler != null) {
                    this.sasTokenRenewalHandlers.remove(amqpsSasTokenRenewalHandler);
                }
                log.debug("Closing device session for multiplexed device {}", next.getDeviceId());
                amqpsSessionHandler.closeSession();
            }
            hashSet.add(next);
        }
        Iterator it3 = hashSet.iterator();
        while (it3.hasNext()) {
            this.multiplexingClientsToUnregister.remove((ClientConfiguration) it3.next());
        }
        this.clientConfigurations.removeAll(hashSet);
    }

    private void initializeStateLatches() {
        this.closeReactorLatch = new CountDownLatch(1);
        if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN) {
            log.trace("Initializing authentication link latch count to {}", 1);
            this.authenticationSessionOpenedLatch = new CountDownLatch(1);
        } else {
            log.trace("Initializing authentication link latch count to 0 because x509 connections don't have authentication links");
            this.authenticationSessionOpenedLatch = new CountDownLatch(0);
        }
        this.deviceSessionsOpenedLatches = new ConcurrentHashMap();
        Iterator<AmqpsSessionHandler> it = this.sessionHandlers.values().iterator();
        while (it.hasNext()) {
            String deviceId = it.next().getDeviceId();
            log.trace("Initializing device session latch for device {}", deviceId);
            this.deviceSessionsOpenedLatches.put(deviceId, new CountDownLatch(1));
        }
    }

    private void closeConnectionWithException(String str, boolean z) throws TransportException {
        TransportException transportException = new TransportException(str);
        transportException.setRetryable(z);
        log.error(str, transportException);
        close();
        throw transportException;
    }

    private void openAsync() throws TransportException {
        String str;
        log.trace("OpenAsnyc called for amqp connection");
        synchronized (this.executorServiceLock) {
            if (this.executorService == null) {
                log.trace("Creating new executor service");
                this.executorService = Executors.newFixedThreadPool(1);
            }
        }
        this.reactor = createReactor();
        String str2 = this.hostName + "-" + (this.isMultiplexing ? "Multiplexed-" + this.transportUniqueIdentifier : this.clientConfiguration.getDeviceClientUniqueIdentifier()) + "-Cnx" + this.connectionId;
        String str3 = "";
        if (this.isMultiplexing) {
            if (this.useIdentifiableThreadNames) {
                str = str3 + str2 + "-azure-iot-sdk-ReactorRunner-ConnectionOwner";
            } else {
                if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) {
                    str3 = str3 + this.threadNamePrefix;
                }
                str = str3 + "azure-iot-sdk-ReactorRunner";
                if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) {
                    str = str + this.threadNameSuffix;
                }
            }
        } else if (this.useIdentifiableThreadNames) {
            str = str3 + str2 + "-azure-iot-sdk-ReactorRunner-ConnectionOwner";
        } else {
            if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) {
                str3 = str3 + this.threadNamePrefix;
            }
            str = str3 + "azure-iot-sdk-ReactorRunner";
            if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) {
                str = str + this.threadNameSuffix;
            }
        }
        this.executorService.submit(new ReactorRunner(this.reactor, this.listener, this.connectionId, str, this));
    }

    private void closeAsync() {
        log.trace("CloseAsync called for amqp connection");
        if (this.connection == null && this.reactor == null) {
            releaseLatch(this.authenticationSessionOpenedLatch);
            releaseDeviceSessionLatches();
            releaseLatch(this.closeReactorLatch);
        } else {
            if (this.connection == null) {
                this.reactor.stop();
                return;
            }
            if (this.reactor == null) {
                log.warn("Connection was initialized without a reactor, connection is in an unknown state; closing connection anyways.");
                this.connection.close();
            } else if (this.connection.getLocalState() == EndpointState.CLOSED && this.connection.getRemoteState() == EndpointState.CLOSED) {
                log.trace("Closing amqp reactor since the connection was already closed");
                this.connection.getReactor().stop();
            } else {
                log.trace("Closing amqp connection");
                this.connection.close();
            }
        }
    }

    private void executorServicesCleanup() {
        synchronized (this.executorServiceLock) {
            if (this.executorService != null) {
                log.trace("Shutdown of executor service has started");
                this.executorService.shutdownNow();
                this.executorService = null;
                log.trace("Shutdown of executor service completed");
            }
        }
    }

    @Override // com.microsoft.azure.sdk.iot.device.transport.amqps.ReactorRunnerStateCallback
    public void onReactorClosedUnexpectedly() {
        releaseLatch(this.authenticationSessionOpenedLatch);
        releaseDeviceSessionLatches();
        releaseLatch(this.closeReactorLatch);
    }
}
