/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.kapua.gateway.client.mqtt.paho;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.eclipse.kapua.gateway.client.BinaryPayloadCodec;
import org.eclipse.kapua.gateway.client.Credentials;
import org.eclipse.kapua.gateway.client.Module;
import org.eclipse.kapua.gateway.client.mqtt.MqttClient;
import org.eclipse.kapua.gateway.client.mqtt.MqttMessageHandler;
import org.eclipse.kapua.gateway.client.mqtt.MqttNamespace;
import org.eclipse.kapua.gateway.client.mqtt.paho.internal.Listeners;
import org.eclipse.kapua.gateway.client.utils.Buffers;
import org.eclipse.kapua.gateway.client.utils.Strings;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PahoClient
extends MqttClient {
    private static final Logger logger = LoggerFactory.getLogger(PahoClient.class);
    private final MqttConnectOptions connectOptions;
    private MqttAsyncClient client;
    private final Map<String, MqttMessageHandler> subscriptions = new HashMap<String, MqttMessageHandler>();

    private static ScheduledExecutorService createExecutor(String clientId) {
        return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, clientId));
    }

    private static MqttConnectOptions createConnectOptions(Builder builder) {
        MqttConnectOptions result = new MqttConnectOptions();
        Object credentials = builder.credentials();
        if (credentials instanceof Credentials.UserAndPassword) {
            Credentials.UserAndPassword userAndPassword = (Credentials.UserAndPassword)credentials;
            result.setUserName(userAndPassword.getUsername());
            result.setPassword(userAndPassword.getPassword());
        } else if (credentials != null) {
            throw new IllegalArgumentException(String.format("Unsupported credentials type: %s", credentials.getClass().getName()));
        }
        return result;
    }

    private PahoClient(Set<Module> modules, String clientId, ScheduledExecutorService executor, MqttNamespace namespace, BinaryPayloadCodec codec, MqttAsyncClient client, MqttClientPersistence persistence, MqttConnectOptions connectOptions) {
        super(executor, codec, namespace, clientId, modules);
        this.connectOptions = connectOptions;
        this.client = client;
        this.client.setCallback(new MqttCallback(){

            public void messageArrived(String topic, MqttMessage message) throws Exception {
                PahoClient.this.handleMessageArrived(topic, message);
            }

            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            public void connectionLost(Throwable cause) {
                PahoClient.this.handleDisconnected();
            }
        });
        this.executor.execute(this::connect);
    }

    protected void connect() {
        try {
            this.client.connect(this.connectOptions, null, new IMqttActionListener(){

                public void onSuccess(IMqttToken asyncActionToken) {
                    PahoClient.this.handleConnected();
                }

                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    PahoClient.this.handleDisconnected();
                }
            });
        }
        catch (MqttException e) {
            logger.warn("Failed to call connect", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        MqttAsyncClient client;
        PahoClient pahoClient = this;
        synchronized (pahoClient) {
            client = this.client;
            if (client == null) {
                return;
            }
            this.client = null;
        }
        try {
            try {
                client.disconnect().waitForCompletion();
            }
            catch (MqttException mqttException) {
                // empty catch block
            }
            try {
                client.close();
            }
            catch (MqttException mqttException) {
                // empty catch block
            }
        }
        finally {
            this.executor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleConnected() {
        PahoClient pahoClient = this;
        synchronized (pahoClient) {
            super.handleConnected();
            this.handleResubscribe();
        }
    }

    private void handleResubscribe() {
        for (Map.Entry<String, MqttMessageHandler> entry : this.subscriptions.entrySet()) {
            try {
                this.internalSubscribe(entry.getKey());
            }
            catch (MqttException e) {
                logger.warn("Failed to re-subscribe to '{}'", (Object)entry.getKey());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleDisconnected() {
        PahoClient pahoClient = this;
        synchronized (pahoClient) {
            try {
                super.handleDisconnected();
            }
            finally {
                this.executor.schedule(this::connect, 1L, TimeUnit.SECONDS);
            }
        }
    }

    public void publishMqtt(String topic, ByteBuffer payload) throws Exception {
        this.publish(topic, payload);
    }

    protected void publish(String topic, ByteBuffer payload) throws MqttException {
        logger.debug("Publishing {} - {}", (Object)topic, (Object)payload);
        this.client.publish(topic, Buffers.toByteArray((ByteBuffer)payload), 1, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletionStage<?> subscribeMqtt(String topic, MqttMessageHandler messageHandler) throws MqttException {
        PahoClient pahoClient = this;
        synchronized (pahoClient) {
            this.subscriptions.put(topic, messageHandler);
            return this.internalSubscribe(topic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unsubscribeMqtt(Set<String> mqttTopics) throws MqttException {
        logger.info("Unsubscribe from: {}", mqttTopics);
        ArrayList<String> topics = new ArrayList<String>(mqttTopics.size());
        PahoClient pahoClient = this;
        synchronized (pahoClient) {
            for (String topic : mqttTopics) {
                if (this.subscriptions.remove(topic) == null) continue;
                topics.add(topic);
            }
        }
        this.client.unsubscribe(topics.toArray(new String[topics.size()]));
    }

    protected void handleMessageArrived(String topic, MqttMessage message) throws Exception {
        ByteBuffer buffer = Buffers.wrap((byte[])message.getPayload());
        buffer.flip();
        logger.debug("Received message - mqtt-topic: {}, payload: {}", (Object)topic, (Object)buffer);
        MqttMessageHandler handler = this.subscriptions.get(topic);
        if (handler != null) {
            handler.handleMessage(topic, buffer);
        }
    }

    private CompletionStage<?> internalSubscribe(String topic) throws MqttException {
        CompletableFuture future = new CompletableFuture();
        this.client.subscribe(topic, 1, null, Listeners.toListener(future));
        return future;
    }

    public static class Builder
    extends MqttClient.Builder<Builder> {
        private Supplier<MqttClientPersistence> persistenceProvider = MemoryPersistence::new;

        protected Builder builder() {
            return this;
        }

        public Builder persistentProvider(Supplier<MqttClientPersistence> provider) {
            this.persistenceProvider = provider != null ? provider : MemoryPersistence::new;
            return this.builder();
        }

        public Supplier<MqttClientPersistence> persistentProvider() {
            return this.persistenceProvider;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public PahoClient build() throws Exception {
            URI broker = Objects.requireNonNull(this.broker(), "Broker must be set");
            String clientId = Strings.nonEmptyText((String)this.clientId(), (String)"clientId");
            MqttClientPersistence persistence = Objects.requireNonNull(this.persistenceProvider.get(), "Persistence provider returned 'null' persistence");
            MqttNamespace namespace = Objects.requireNonNull(this.namespace(), "Namespace must be set");
            BinaryPayloadCodec codec = Objects.requireNonNull(this.codec(), "Codec must be set");
            MqttAsyncClient client = new MqttAsyncClient(broker.toString(), clientId, persistence);
            ScheduledExecutorService executor = PahoClient.createExecutor(clientId);
            try {
                PahoClient result = new PahoClient(this.modules(), clientId, executor, namespace, codec, client, persistence, PahoClient.createConnectOptions(this));
                client = null;
                executor = null;
                PahoClient pahoClient = result;
                return pahoClient;
            }
            finally {
                if (executor != null) {
                    executor.shutdown();
                }
                if (client != null) {
                    try {
                        client.disconnectForcibly(0L);
                    }
                    finally {
                        client.close();
                    }
                }
            }
        }
    }
}

