/*
 * Decompiled with CFR 0.152.
 */
package de.fraunhofer.iosb.ilt.frostserver.messagebus;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.fraunhofer.iosb.ilt.frostserver.json.deserialize.JsonReader;
import de.fraunhofer.iosb.ilt.frostserver.json.serialize.JsonWriter;
import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageBus;
import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageListener;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityChangedMessage;
import de.fraunhofer.iosb.ilt.frostserver.model.ModelRegistry;
import de.fraunhofer.iosb.ilt.frostserver.settings.BusSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.ConfigDefaults;
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.Settings;
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValue;
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt;
import de.fraunhofer.iosb.ilt.frostserver.util.ChangingStatusLogger;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper;
import de.fraunhofer.iosb.ilt.frostserver.util.StringHelper;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttMessageBus
implements MessageBus,
MqttCallback,
ConfigDefaults {
    @DefaultValueInt(value=2)
    public static final String TAG_SEND_WORKER_COUNT = "sendWorkerPoolSize";
    @DefaultValueInt(value=2)
    public static final String TAG_RECV_WORKER_COUNT = "recvWorkerPoolSize";
    @DefaultValueInt(value=100)
    public static final String TAG_SEND_QUEUE_SIZE = "sendQueueSize";
    @DefaultValueInt(value=100)
    public static final String TAG_RECV_QUEUE_SIZE = "recvQueueSize";
    @DefaultValue(value="tcp://127.0.0.1:1884")
    public static final String TAG_MQTT_BROKER = "mqttBroker";
    @DefaultValue(value="FROST-Bus")
    public static final String TAG_TOPIC_NAME = "topicName";
    @DefaultValueInt(value=2)
    public static final String TAG_QOS_LEVEL = "qosLevel";
    @DefaultValueInt(value=50)
    public static final String TAG_MAX_IN_FLIGHT = "maxInFlight";
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttMessageBus.class);
    private int sendPoolSize;
    private int sendQueueSize;
    private int recvPoolSize;
    private int recvQueueSize;
    private BlockingQueue<EntityChangedMessage> sendQueue;
    private ExecutorService sendService;
    private List<ProcessorHelper.Processor<EntityChangedMessage>> sendProcessors = new ArrayList<ProcessorHelper.Processor<EntityChangedMessage>>();
    private BlockingQueue<EntityChangedMessage> recvQueue;
    private ExecutorService recvService;
    private List<ProcessorHelper.Processor<EntityChangedMessage>> recvProcessors = new ArrayList<ProcessorHelper.Processor<EntityChangedMessage>>();
    private ScheduledExecutorService maintenanceTimer;
    private final List<MessageListener> listeners = new CopyOnWriteArrayList<MessageListener>();
    private final ChangingStatusLogger statusLogger = new ChangingStatusLogger(LOGGER);
    private final AtomicInteger sendQueueCount = new AtomicInteger();
    private final LoggingStatus logStatus = new LoggingStatus(this::checkWorkers);
    private String broker;
    private final String clientId = "FROST-MQTT-Bus-" + UUID.randomUUID();
    private MqttClient client;
    private String topicName;
    private int qosLevel;
    private int maxInFlight;
    private boolean listening = false;
    private ObjectMapper formatter;
    private JsonReader parser;

    @Override
    public void init(CoreSettings settings) {
        BusSettings busSettings = settings.getBusSettings();
        Settings customSettings = busSettings.getCustomSettings();
        this.sendPoolSize = customSettings.getInt(TAG_SEND_WORKER_COUNT, this.getClass());
        this.sendQueueSize = customSettings.getInt(TAG_SEND_QUEUE_SIZE, this.getClass());
        this.recvPoolSize = customSettings.getInt(TAG_RECV_WORKER_COUNT, this.getClass());
        this.recvQueueSize = customSettings.getInt(TAG_RECV_QUEUE_SIZE, this.getClass());
        this.sendQueue = new ArrayBlockingQueue<EntityChangedMessage>(this.sendQueueSize);
        this.sendService = ProcessorHelper.createProcessors(this.sendPoolSize, this.sendQueue, this::handleMessageSent, "mqtt-BusS", this.sendProcessors);
        this.recvQueue = new ArrayBlockingQueue<EntityChangedMessage>(this.recvQueueSize);
        this.recvService = ProcessorHelper.createProcessors(this.recvPoolSize, this.recvQueue, this::handleMessageReceived, "mqtt-BusR", this.recvProcessors);
        this.broker = customSettings.get(TAG_MQTT_BROKER, this.getClass());
        this.topicName = customSettings.get(TAG_TOPIC_NAME, this.getClass());
        this.qosLevel = customSettings.getInt(TAG_QOS_LEVEL, this.getClass());
        this.maxInFlight = customSettings.getInt(TAG_MAX_IN_FLIGHT, this.getClass());
        this.connect();
        this.formatter = JsonWriter.getObjectMapper();
        ModelRegistry modelRegistry = settings.getModelRegistry();
        this.parser = new JsonReader(modelRegistry);
        long queueLoggingInterval = settings.getSettings().getInt("queueLoggingInterval", CoreSettings.class);
        if (queueLoggingInterval > 0L) {
            this.statusLogger.setLogIntervalMs(queueLoggingInterval).addLogStatus(this.logStatus).start();
        }
        this.maintenanceTimer = Executors.newSingleThreadScheduledExecutor();
        this.maintenanceTimer.scheduleWithFixedDelay(this::connect, 60L, 20L, TimeUnit.SECONDS);
    }

    private synchronized void connect() {
        if (this.client == null) {
            try {
                LOGGER.info("Creating new paho-client for broker: {} with client-id {}", (Object)this.broker, (Object)this.clientId);
                this.client = new MqttClient(this.broker, this.clientId, new MemoryPersistence());
                this.client.setCallback(this);
            }
            catch (MqttException ex2) {
                LOGGER.error("Failed to create MQTT client to connect to broker: {}", (Object)this.broker);
                LOGGER.error("", ex2);
                return;
            }
        }
        if (!this.client.isConnected()) {
            try {
                LOGGER.info("paho-client connecting to broker: {} with client-id {}", (Object)this.broker, (Object)this.clientId);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setAutomaticReconnect(true);
                connOpts.setCleanSession(false);
                connOpts.setKeepAliveInterval(30);
                connOpts.setConnectionTimeout(30);
                connOpts.setMaxInflight(this.maxInFlight);
                this.client.connect(connOpts);
                LOGGER.info("paho-client connected to broker");
            }
            catch (MqttException ex3) {
                LOGGER.error("Failed to connect to broker: {}", (Object)this.broker);
                LOGGER.error("", ex3);
                return;
            }
            if (!this.listeners.isEmpty()) {
                this.startListening();
            }
        }
    }

    private synchronized void disconnect() {
        this.listening = false;
        if (this.client == null) {
            return;
        }
        if (this.client.isConnected()) {
            try {
                LOGGER.info("paho-client disconnecting from broker: {}", (Object)this.broker);
                this.client.disconnect(1000L);
            }
            catch (MqttException ex2) {
                LOGGER.error("Exception disconnecting client.", ex2);
            }
        }
        try {
            LOGGER.info("paho-client closing");
            this.client.close();
        }
        catch (MqttException ex3) {
            LOGGER.error("Exception closing client.", ex3);
        }
        this.client = null;
    }

    private synchronized void startListening() {
        try {
            LOGGER.info("paho-client subscribing to topic: {}", (Object)this.topicName);
            if (this.client == null || !this.client.isConnected()) {
                this.connect();
            }
            if (!this.listening) {
                this.client.subscribeWithResponse(this.topicName, this.qosLevel).setActionCallback(new IMqttActionListener(){

                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        MqttMessageBus.this.listening = true;
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        MqttMessageBus.this.listening = false;
                    }
                });
            }
        }
        catch (MqttException ex2) {
            LOGGER.error("Failed to start listening, removing client.", ex2);
            MqttClient tempclient = this.client;
            this.client = null;
            try {
                tempclient.close(true);
            }
            catch (RuntimeException | MqttException exception) {
                // empty catch block
            }
        }
    }

    private synchronized void stopListening() {
        if (!this.listening) {
            return;
        }
        try {
            LOGGER.info("paho-client unsubscribing from topic: {}", (Object)this.topicName);
            this.client.unsubscribe(this.topicName);
            this.listening = false;
        }
        catch (MqttException ex2) {
            LOGGER.error("Failed to stop listening.", ex2);
        }
    }

    @Override
    public void stop() {
        LOGGER.info("Message bus shutting down.");
        this.stopListening();
        this.disconnect();
        if (this.maintenanceTimer != null) {
            this.maintenanceTimer.shutdownNow();
        }
        ProcessorHelper.shutdownProcessors(this.sendService, this.sendQueue, 10L, TimeUnit.SECONDS);
        ProcessorHelper.shutdownProcessors(this.recvService, this.recvQueue, 10L, TimeUnit.SECONDS);
        this.statusLogger.stop();
        LOGGER.info("Message bus closed.");
    }

    @Override
    public void sendMessage(EntityChangedMessage message) {
        if (this.sendQueue.offer(message)) {
            this.logStatus.setSendQueueCount(this.sendQueueCount.incrementAndGet());
        } else {
            LOGGER.error("Failed to add message to send-queue. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.", "bus.", TAG_SEND_QUEUE_SIZE, this.sendQueueSize, "bus.", TAG_SEND_WORKER_COUNT, this.sendPoolSize);
        }
    }

    @Override
    public synchronized void addMessageListener(MessageListener listener) {
        this.listeners.add(listener);
        if (!this.listening) {
            this.startListening();
        }
    }

    @Override
    public synchronized void removeMessageListener(MessageListener listener) {
        this.listeners.remove(listener);
        if (this.listeners.isEmpty()) {
            this.stopListening();
        }
    }

    private void handleMessageSent(EntityChangedMessage message) {
        this.logStatus.setSendQueueCount(this.sendQueueCount.decrementAndGet());
        try {
            String serialisedMessage = this.formatter.writeValueAsString(message);
            byte[] bytes = serialisedMessage.getBytes(StringHelper.UTF8);
            if (!this.client.isConnected()) {
                this.connect();
            }
            this.client.publish(this.topicName, bytes, this.qosLevel, false);
        }
        catch (JsonProcessingException | MqttException ex2) {
            LOGGER.error("Failed to publish message to bus.", ex2);
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        if (this.listening) {
            LOGGER.warn("Connection to message bus lost (Stacktrace in DEBUG): {}.", (Object)cause.getMessage());
            LOGGER.debug("", cause);
            this.listening = false;
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws IOException {
        EntityChangedMessage ecMessage;
        String serialisedEcMessage = new String(mqttMessage.getPayload(), StringHelper.UTF8);
        LOGGER.trace("Received: {}", (Object)serialisedEcMessage);
        try {
            ecMessage = this.parser.parseObject(EntityChangedMessage.class, serialisedEcMessage);
        }
        catch (IllegalArgumentException ex2) {
            LOGGER.error("Failed to decode message from bus. Details in DEBUG.");
            LOGGER.debug("Failed to decode message: {}", (Object)serialisedEcMessage, (Object)ex2);
            return;
        }
        if (!this.recvQueue.offer(ecMessage)) {
            LOGGER.error("Failed to add message to receive-queue. Increase {}{} (currently {}) to allow a bigger buffer, or increase {}{} (currently {}) to empty the buffer quicker.", "bus.", TAG_RECV_QUEUE_SIZE, this.recvQueueSize, "bus.", TAG_RECV_WORKER_COUNT, this.recvPoolSize);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    private void handleMessageReceived(EntityChangedMessage message) {
        for (MessageListener listener : this.listeners) {
            try {
                listener.messageReceived(message);
            }
            catch (Exception ex2) {
                LOGGER.error("Listener threw exception on message reception.", ex2);
            }
        }
    }

    private void checkWorkers() {
        int recvWaiting = 0;
        int recvWorking = 0;
        int recvBroken = 0;
        int sendWaiting = 0;
        int sendWorking = 0;
        int sendBroken = 0;
        Instant threshold = Instant.now().minus(2L, ChronoUnit.SECONDS);
        block8: for (ProcessorHelper.Processor<EntityChangedMessage> processor : this.recvProcessors) {
            switch (processor.getStatus()) {
                case WAITING: {
                    ++recvWaiting;
                    continue block8;
                }
                case WORKING: {
                    if (!processor.isFine(threshold)) {
                        ++recvBroken;
                        continue block8;
                    }
                    ++recvWorking;
                    continue block8;
                }
            }
            LOGGER.trace("Worker not started.");
        }
        block9: for (ProcessorHelper.Processor<EntityChangedMessage> processor : this.sendProcessors) {
            switch (processor.getStatus()) {
                case WAITING: {
                    ++sendWaiting;
                    continue block9;
                }
                case WORKING: {
                    if (!processor.isFine(threshold)) {
                        ++sendBroken;
                        continue block9;
                    }
                    ++sendWorking;
                    continue block9;
                }
            }
            LOGGER.trace("Worker not started.");
        }
        this.logStatus.setRecvWaiting(recvWaiting).setRecvWorking(recvWorking).setRecvBad(recvBroken).setSendWaiting(sendWaiting).setSendWorking(sendWorking).setSendBad(sendBroken);
    }

    private static class LoggingStatus
    extends ChangingStatusLogger.ChangingStatusDefault {
        public static final String MESSAGE = "RecvQueue: {} [{}, {}, {}] SendQueue: {} [{}, {}, {}] ";
        public final Object[] status = this.getCurrentParams();
        private final Runnable processor;

        public LoggingStatus(Runnable processor) {
            super(MESSAGE, new Object[8]);
            Arrays.setAll(this.status, i2 -> 0);
            this.processor = processor;
        }

        @Override
        public void process() {
            this.processor.run();
        }

        public LoggingStatus setRecvQueueCount(Integer count) {
            this.status[0] = count;
            return this;
        }

        public LoggingStatus setRecvWaiting(Integer size) {
            this.status[1] = size;
            return this;
        }

        public LoggingStatus setRecvWorking(Integer size) {
            this.status[2] = size;
            return this;
        }

        public LoggingStatus setRecvBad(Integer size) {
            this.status[3] = size;
            return this;
        }

        public LoggingStatus setSendQueueCount(Integer count) {
            this.status[4] = count;
            return this;
        }

        public LoggingStatus setSendWaiting(Integer size) {
            this.status[5] = size;
            return this;
        }

        public LoggingStatus setSendWorking(Integer size) {
            this.status[6] = size;
            return this;
        }

        public LoggingStatus setSendBad(Integer size) {
            this.status[7] = size;
            return this;
        }
    }
}

