package org.apache.edgent.connectors.mqtt.runtime;

import java.util.concurrent.TimeUnit;
import org.apache.edgent.connectors.mqtt.MqttConfig;
import org.apache.edgent.connectors.runtime.Connector;
import org.apache.edgent.function.Supplier;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/edgent/connectors/mqtt/runtime/MqttConnector.class */
public class MqttConnector extends Connector<MqttClient> {
    private static final long serialVersionUID = 1;
    private static final Logger logger = LoggerFactory.getLogger(MqttConnector.class);
    private String id;
    private final String clientId;
    private final Supplier<MqttConfig> configFn;
    private volatile MqttSubscriber<?> subscriber;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/edgent/connectors/mqtt/runtime/MqttConnector$Callback.class */
    public class Callback implements MqttCallback {
        private Callback() {
        }

        public void connectionLost(Throwable th) {
            MqttConnector.this.connectionLost(th);
        }

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        }

        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            try {
                MqttConnector.logger.trace("{} received topic:{} qos:{} isRetained:{} {} bytes", new Object[]{MqttConnector.this.id(), str, Integer.valueOf(mqttMessage.getQos()), Boolean.valueOf(mqttMessage.isRetained()), Integer.valueOf(mqttMessage.getPayload().length)});
                MqttConnector.this.notIdle();
                MqttConnector.this.subscriber.messageArrived(str, mqttMessage);
            } catch (Exception e) {
                MqttConnector.logger.error("{} messageArrived handling failed", MqttConnector.this.id(), e);
            }
        }
    }

    public MqttConnector(Supplier<MqttConfig> supplier) {
        this.configFn = supplier;
        String clientId = ((MqttConfig) this.configFn.get()).getClientId();
        this.clientId = clientId == null ? MqttClient.generateClientId() : clientId;
    }

    public Logger getLogger() {
        return logger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSubscriber(MqttSubscriber<?> mqttSubscriber) {
        this.subscriber = mqttSubscriber;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized MqttClient doConnect(MqttClient mqttClient) throws MqttException {
        MqttConfig mqttConfig = (MqttConfig) this.configFn.get();
        if (mqttClient == null) {
            mqttClient = newClient(mqttConfig);
        }
        if (mqttClient.isConnected()) {
            return mqttClient;
        }
        MqttConnectOptions mqttConnectOptions = (MqttConnectOptions) mqttConfig.options();
        Logger logger2 = logger;
        Object[] objArr = new Object[11];
        objArr[0] = id();
        objArr[1] = Boolean.valueOf(mqttConnectOptions.isCleanSession());
        objArr[2] = mqttConnectOptions.getUserName();
        objArr[3] = mqttConnectOptions.getPassword() == null ? null : "*****";
        objArr[4] = Integer.valueOf(mqttConfig.getIdleTimeout());
        objArr[5] = Integer.valueOf(mqttConfig.getSubscriberIdleReconnectInterval());
        objArr[6] = Integer.valueOf(mqttConnectOptions.getConnectionTimeout());
        objArr[7] = Integer.valueOf(mqttConnectOptions.getKeepAliveInterval());
        objArr[8] = mqttConnectOptions.getServerURIs();
        objArr[9] = mqttConnectOptions.getWillDestination();
        objArr[10] = mqttConnectOptions.getWillMessage();
        logger2.info("{} cleanSession:{} userName:{} password:{} idleTimeout:{} idleReconnectTimeout:{} cnTimeout:{} keepalive:{} serverURIs:{} willDst:{} willMsg:{}", objArr);
        mqttClient.connect(mqttConnectOptions);
        setIdleTimeout(mqttConfig.getIdleTimeout(), TimeUnit.SECONDS);
        MqttSubscriber<?> mqttSubscriber = this.subscriber;
        if (mqttSubscriber != null) {
            setIdleReconnectInterval(mqttConfig.getSubscriberIdleReconnectInterval());
            mqttSubscriber.connected(mqttClient);
        }
        return mqttClient;
    }

    private MqttClient newClient(MqttConfig mqttConfig) throws MqttException {
        String str = mqttConfig.getServerURLs()[0];
        MemoryPersistence persistence = mqttConfig.getPersistence();
        if (persistence == null) {
            persistence = new MemoryPersistence();
        }
        long actionTimeToWaitMillis = mqttConfig.getActionTimeToWaitMillis();
        logger.info("{} server:{} clientId:{} actionTimeToWait:{} persistence:{}", new Object[]{id(), str, this.clientId, Long.valueOf(actionTimeToWaitMillis), persistence});
        MqttClient mqttClient = new MqttClient(str, this.clientId, persistence);
        mqttClient.setTimeToWait(actionTimeToWaitMillis);
        mqttClient.setCallback(new Callback());
        return mqttClient;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void doDisconnect(MqttClient mqttClient) throws Exception {
        if (mqttClient.isConnected()) {
            mqttClient.disconnect();
        }
    }

    public void doClose(MqttClient mqttClient) throws Exception {
        try {
            doDisconnect(mqttClient);
        } finally {
            mqttClient.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String id() {
        if (this.id == null) {
            this.id = "MQTT " + toString().substring(toString().indexOf(64) + 1) + " " + this.clientId;
        }
        return this.id;
    }
}
