/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.connectors.mqttv3;

import de.iip_ecosphere.platform.connectors.AbstractChannelConnector;
import de.iip_ecosphere.platform.connectors.ChannelAdapterSelector;
import de.iip_ecosphere.platform.connectors.ConnectorDescriptor;
import de.iip_ecosphere.platform.connectors.ConnectorParameter;
import de.iip_ecosphere.platform.connectors.MachineConnector;
import de.iip_ecosphere.platform.connectors.types.ChannelProtocolAdapter;
import de.iip_ecosphere.platform.support.net.SslUtils;
import de.iip_ecosphere.platform.transport.connectors.basics.MqttQoS;
import de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector;
import java.io.File;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.SocketFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.LoggerFactory;

@MachineConnector(hasModel=false, supportsEvents=true, supportsHierarchicalQNames=false, supportsModelCalls=false, supportsModelProperties=false, supportsModelStructs=false)
public class PahoMqttv3Connector<CO, CI>
extends AbstractChannelConnector<byte[], byte[], CO, CI> {
    public static final String NAME = "MQTT v3";
    private static final Logger LOGGER = Logger.getLogger(PahoMqttv3Connector.class.getName());
    private MqttAsyncClient client;
    private boolean tlsEnabled = false;

    @SafeVarargs
    public PahoMqttv3Connector(ChannelProtocolAdapter<byte[], byte[], CO, CI> ... adapter) {
        this((ChannelAdapterSelector<byte[], byte[], CO, CI>)null, adapter);
    }

    @SafeVarargs
    public PahoMqttv3Connector(ChannelAdapterSelector<byte[], byte[], CO, CI> selector, ChannelProtocolAdapter<byte[], byte[], CO, CI> ... adapter) {
        super(selector, adapter);
    }

    protected void connectImpl(ConnectorParameter params) throws IOException {
        try {
            String broker = params.getKeystore() != null ? "ssl://" : "tcp://";
            broker = broker + params.getHost() + ":" + params.getPort();
            String appId = AbstractTransportConnector.getApplicationId((String)params.getApplicationId(), (String)"conn", (boolean)params.getAutoApplicationId());
            this.client = new MqttAsyncClient(broker, appId, (MqttClientPersistence)new MemoryPersistence());
            this.client.setCallback((MqttCallback)new Callback());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(params.getKeepAlive());
            connOpts.setAutomaticReconnect(true);
            connOpts.setMaxInflight(1000);
            if (null != params.getKeystore()) {
                try {
                    connOpts.setSocketFactory((SocketFactory)SslUtils.createTlsContext((File)params.getKeystore(), (String)params.getKeystorePassword(), (String)params.getKeyAlias()).getSocketFactory());
                    connOpts.setHttpsHostnameVerificationEnabled(params.getHostnameVerification());
                    this.tlsEnabled = true;
                }
                catch (IOException e) {
                    LoggerFactory.getLogger(((Object)((Object)this)).getClass()).error("MQTT: Loading keystore " + e.getMessage() + ". Trying with no TLS.");
                }
            }
            this.waitForCompletion(this.client.connect(connOpts));
            for (String out : this.getOutputChannels()) {
                try {
                    this.waitForCompletion(this.client.subscribe(out, MqttQoS.AT_LEAST_ONCE.value()));
                }
                catch (MqttException e) {
                    throw new IOException(e);
                }
            }
        }
        catch (MqttException e) {
            throw new IOException(e);
        }
    }

    private void waitForCompletion(IMqttToken token) throws MqttException {
        token.waitForCompletion((long)this.getConnectorParameter().getRequestTimeout());
    }

    protected void disconnectImpl() throws IOException {
        if (null != this.client && this.client.isConnected()) {
            try {
                for (String out : this.getOutputChannels()) {
                    try {
                        this.waitForCompletion(this.client.unsubscribe(out));
                    }
                    catch (MqttException mqttException) {
                        // empty catch block
                    }
                }
                this.waitForCompletion(this.client.disconnect());
                this.client.close();
            }
            catch (MqttException e) {
                throw new IOException(e);
            }
        }
    }

    public void dispose() {
    }

    public String getName() {
        return NAME;
    }

    protected void writeImpl(byte[] data, String channel) throws IOException {
        MqttMessage message = new MqttMessage(data);
        message.setQos(MqttQoS.AT_LEAST_ONCE.value());
        try {
            IMqttDeliveryToken token = this.client.publish(channel, message);
            this.waitForCompletion((IMqttToken)token);
        }
        catch (MqttException e) {
            throw new IOException(e);
        }
    }

    protected byte[] read() throws IOException {
        return null;
    }

    protected void error(String message, Throwable th) {
        LOGGER.log(Level.SEVERE, message, th);
    }

    public String supportedEncryption() {
        return "TLS";
    }

    public String enabledEncryption() {
        return this.tlsEnabled ? "TLS" : null;
    }

    private class Callback
    implements MqttCallback {
        private Callback() {
        }

        public void connectionLost(Throwable cause) {
        }

        public void messageArrived(String topic, MqttMessage message) throws Exception {
            PahoMqttv3Connector.this.received(message.getPayload());
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }

    public static class Descriptor
    implements ConnectorDescriptor {
        public String getName() {
            return PahoMqttv3Connector.NAME;
        }

        public Class<?> getType() {
            return PahoMqttv3Connector.class;
        }
    }
}

