/*
 * Decompiled with CFR 0.152.
 */
package de.dentrassi.flow.component.mqtt;

import de.dentrassi.flow.ComponentContext;
import de.dentrassi.flow.spi.component.AnnotatedComponent;
import de.dentrassi.flow.spi.component.DataIn;
import de.dentrassi.flow.spi.component.DataOut;
import de.dentrassi.flow.spi.component.EventContext;
import de.dentrassi.flow.spi.component.TriggerIn;
import de.dentrassi.flow.spi.component.TriggerPortOut;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import java.time.Duration;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttClient
extends AnnotatedComponent {
    private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
    private final TriggerPortOut connectionEstablished;
    private final TriggerPortOut connectionLost;
    private String host;
    private int port;
    private String clientId;
    private String username;
    private String password;
    private ComponentContext.SharedResource<Vertx> vertx;
    private de.dentrassi.flow.component.mqtt.internal.io.vertx.mqtt.MqttClient client;
    private boolean started;
    private boolean connected;
    private final long nextSleepMultiplier = 2L;
    private final Duration nextSleepMax = Duration.ofMinutes(1L);
    private volatile Duration nextSleep = Duration.ZERO;

    public MqttClient() {
        this.connectionEstablished = this.registerTriggerOut("connectionEstablished");
        this.connectionLost = this.registerTriggerOut("connectionLost");
    }

    public void start(Map<String, String> initializers, ComponentContext context, EventContext event) {
        super.start(initializers, context, event);
        this.vertx = context.createSharedResource(MqttClient.class, "vertx", Vertx.class, () -> Vertx.vertx(), Vertx::close);
        this.started = true;
    }

    public void stop() {
        this.started = false;
        this.disconnect();
        this.vertx.close();
        super.stop();
    }

    @DataOut
    public boolean isConnected() {
        return this.connected;
    }

    @TriggerIn
    public void connect() {
        MqttClientOptions options = new MqttClientOptions();
        if (this.clientId != null) {
            options.setAutoGeneratedClientId(false);
            options.setClientId(this.clientId);
        } else {
            options.setAutoGeneratedClientId(true);
        }
        options.setAutoKeepAlive(true);
        options.setUsername(this.username);
        options.setPassword(this.password);
        this.client = de.dentrassi.flow.component.mqtt.internal.io.vertx.mqtt.MqttClient.create((Vertx)this.vertx.get(), options).closeHandler((Handler<Void>)((Handler)v -> this.runOnContext(this::connectionLost)));
        this.performConnect();
    }

    private void performConnect() {
        logger.info("Trigger connect");
        if (!this.started) {
            logger.info("Cancel connect attempt. We are stopped.");
            return;
        }
        Duration sleep = this.nextSleep();
        logger.info("Reconnect delay: {}", (Object)sleep);
        if (!sleep.isZero()) {
            ((Vertx)this.vertx.get()).setTimer(sleep.toMillis(), timer -> this.doConnect());
        } else {
            this.doConnect();
        }
    }

    private void doConnect() {
        this.client.connect(this.port, this.host, (Handler<AsyncResult<MqttConnAckMessage>>)((Handler)connected -> {
            if (connected.succeeded()) {
                this.runOnContext(this::connectionEstablished);
            } else {
                logger.warn("Failed to connect", connected.cause());
                this.runOnContext(this::connectionLost);
            }
        }));
    }

    private Duration nextSleep() {
        Duration result = this.nextSleep;
        Duration next = this.nextSleep;
        next = next.isZero() ? Duration.ofMillis(100L) : next.multipliedBy(this.nextSleepMultiplier);
        this.nextSleep = next.compareTo(this.nextSleepMax) <= 0 ? next : this.nextSleepMax;
        return result;
    }

    @TriggerIn
    public void disconnect() {
        this.client.disconnect();
    }

    private void connectionLost() {
        logger.info("Connection lost - previousState: {}", (Object)this.connected);
        if (this.connected) {
            this.connected = false;
            this.connectionLost.execute();
        } else {
            logger.info("Receive lost event when already disconnected");
        }
        this.performConnect();
    }

    private void connectionEstablished() {
        logger.info("Connection established - previousState: {}", (Object)this.connected);
        if (!this.connected) {
            this.nextSleep = Duration.ZERO;
            this.connected = true;
            this.connectionEstablished.execute();
        } else {
            logger.error("Receive established event when already connected");
        }
    }

    @DataIn
    public void setHost(String host) {
        this.host = host;
    }

    @DataIn
    public void setPort(Integer port) {
        this.port = port == null ? 1883 : port;
    }

    @DataIn
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    @DataIn
    public void setUsername(String username) {
        this.username = username;
    }

    @DataIn
    public void setPassword(String password) {
        this.password = password;
    }

    @DataOut
    public de.dentrassi.flow.component.mqtt.internal.io.vertx.mqtt.MqttClient getClient() {
        return this.client;
    }
}

