/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.kapua.kura.simulator;

import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.kapua.kura.simulator.GatewayConfiguration;
import org.eclipse.kapua.kura.simulator.Transport;
import org.eclipse.kapua.kura.simulator.payload.Message;
import org.eclipse.kapua.kura.simulator.topic.Topic;
import org.eclipse.kapua.kura.simulator.util.Hex;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttSimulatorTransport
implements AutoCloseable,
Transport {
    private static final Logger logger = LoggerFactory.getLogger(MqttSimulatorTransport.class);
    private final MqttAsyncClient client;
    private final MqttConnectOptions connectOptions;
    private Runnable onConnected;
    private Runnable onDisconnected;
    private final Map<String, String> topicContext;

    public MqttSimulatorTransport(GatewayConfiguration configuration) throws MqttException {
        HashMap<String, String> topicContext = new HashMap<String, String>();
        topicContext.put("account-name", configuration.getAccountName());
        topicContext.put("client-id", configuration.getClientId());
        this.topicContext = Collections.unmodifiableMap(topicContext);
        MemoryPersistence persistence = new MemoryPersistence();
        String plainBrokerUrl = MqttSimulatorTransport.plainUrl(configuration.getBrokerUrl());
        this.client = new MqttAsyncClient(plainBrokerUrl, configuration.getClientId(), persistence);
        this.client.setCallback(new MqttCallback(){

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            @Override
            public void connectionLost(Throwable cause) {
                MqttSimulatorTransport.this.handleDisconnected();
            }
        });
        this.connectOptions = this.createConnectOptions(configuration.getBrokerUrl());
    }

    private static String plainUrl(String brokerUrl) {
        try {
            URIBuilder u = new URIBuilder(brokerUrl);
            u.setUserInfo(null);
            return u.build().toString();
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("Failed to clean up broker URL", e);
        }
    }

    private MqttConnectOptions createConnectOptions(String brokerUrl) {
        try {
            String[] toks;
            URIBuilder u = new URIBuilder(brokerUrl);
            MqttConnectOptions result = new MqttConnectOptions();
            result.setAutomaticReconnect(true);
            String ui = u.getUserInfo();
            if (ui != null && !ui.isEmpty() && (toks = ui.split("\\:", 2)).length == 2) {
                result.setUserName(toks[0]);
                result.setPassword(toks[1].toCharArray());
            }
            return result;
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("Failed to create MQTT options", e);
        }
    }

    @Override
    public void connect() {
        try {
            this.client.connect(this.connectOptions, null, new IMqttActionListener(){

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    MqttSimulatorTransport.this.handleConnected();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    logger.warn("Failed to connect", exception);
                }
            });
        }
        catch (MqttException e) {
            logger.warn("Failed to initiate connect", e);
        }
    }

    @Override
    public void disconnect() {
        try {
            this.client.disconnect(null, new IMqttActionListener(){

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    MqttSimulatorTransport.this.handleDisconnected();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    logger.warn("Failed to disconnect", exception);
                }
            });
        }
        catch (MqttException e) {
            logger.warn("Failed to initiatate disconnect", e);
        }
    }

    @Override
    public void close() throws MqttException {
        try {
            this.client.disconnect(0L).waitForCompletion();
        }
        finally {
            this.client.close();
        }
    }

    @Override
    public void subscribe(Topic topic, final Consumer<Message> consumer) {
        block2: {
            Objects.requireNonNull(consumer);
            try {
                this.client.subscribe(topic.render(this.topicContext), 0, null, null, new IMqttMessageListener(){

                    @Override
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        logger.debug("Received MQTT message from {}", (Object)topic);
                        consumer.accept(new Message(Topic.fromString(topic), mqttMessage.getPayload(), MqttSimulatorTransport.this.topicContext));
                    }
                });
            }
            catch (MqttException e) {
                if (e.getReasonCode() == 32104) break block2;
                logger.warn("Failed to subscribe to: {}", (Object)topic, (Object)e);
            }
        }
    }

    @Override
    public void unsubscribe(Topic topic) {
        block2: {
            try {
                this.client.unsubscribe(topic.render(this.topicContext));
            }
            catch (MqttException e) {
                if (e.getReasonCode() == 32104) break block2;
                logger.warn("Failed to unsubscribe: {}", (Object)topic, (Object)e);
            }
        }
    }

    @Override
    public void whenConnected(Runnable runnable) {
        this.onConnected = runnable;
    }

    @Override
    public void whenDisconnected(Runnable runnable) {
        this.onDisconnected = runnable;
    }

    protected void handleConnected() {
        Runnable runnable = this.onConnected;
        if (runnable != null) {
            runnable.run();
        }
    }

    protected void handleDisconnected() {
        Runnable runnable = this.onDisconnected;
        if (runnable != null) {
            runnable.run();
        }
    }

    @Override
    public void sendMessage(Topic topic, byte[] payload) {
        if (logger.isDebugEnabled()) {
            logger.debug("Sending message - topic: {}, payload: {}", (Object)topic, (Object)Hex.toHex(payload, 256));
        }
        try {
            String fullTopic = topic.render(this.topicContext);
            logger.debug("Full topic: {}", (Object)fullTopic);
            this.client.publish(fullTopic, payload, 0, false);
        }
        catch (Exception e) {
            logger.warn("Failed to send out message", e);
        }
    }
}

