/*
 * Decompiled with CFR 0.152.
 */
package de.fraunhofer.iosb.ilt.frostserver.mqtt;

import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageListener;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityChangedMessage;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityType;
import de.fraunhofer.iosb.ilt.frostserver.model.ModelRegistry;
import de.fraunhofer.iosb.ilt.frostserver.model.core.Entity;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttServer;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.MqttServerFactory;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.SubscriptionManager;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.create.EntityCreateEvent;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.create.EntityCreateListener;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.Subscription;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionEvent;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionFactory;
import de.fraunhofer.iosb.ilt.frostserver.mqtt.subscription.SubscriptionListener;
import de.fraunhofer.iosb.ilt.frostserver.path.Version;
import de.fraunhofer.iosb.ilt.frostserver.persistence.PersistenceManager;
import de.fraunhofer.iosb.ilt.frostserver.persistence.PersistenceManagerFactory;
import de.fraunhofer.iosb.ilt.frostserver.property.Property;
import de.fraunhofer.iosb.ilt.frostserver.service.Service;
import de.fraunhofer.iosb.ilt.frostserver.service.ServiceRequest;
import de.fraunhofer.iosb.ilt.frostserver.service.ServiceResponseDefault;
import de.fraunhofer.iosb.ilt.frostserver.service.UpdateMode;
import de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.MqttSettings;
import de.fraunhofer.iosb.ilt.frostserver.settings.UnknownVersionException;
import de.fraunhofer.iosb.ilt.frostserver.util.ChangingStatusLogger;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttManager
implements SubscriptionListener,
MessageListener,
EntityCreateListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttManager.class);
    private static List<SubscriptionListener> testListeners;
    private final Map<EntityType, SubscriptionManager> subscriptions = new HashMap<EntityType, SubscriptionManager>();
    private final CoreSettings settings;
    private final SubscriptionFactory subscriptionFactory;
    private MqttServer server;
    private BlockingQueue<EntityChangedMessage> entityChangedEventQueue;
    private ExecutorService entityChangedExecutorService;
    private final List<ProcessorHelper.Processor<EntityChangedMessage>> entityChangedProcessors = new ArrayList<ProcessorHelper.Processor<EntityChangedMessage>>();
    private BlockingQueue<EntityCreateEvent> entityCreateEventQueue;
    private ExecutorService entityCreateExecutorService;
    private final List<ProcessorHelper.Processor<EntityCreateEvent>> entityCreateProcessors = new ArrayList<ProcessorHelper.Processor<EntityCreateEvent>>();
    private final ChangingStatusLogger statusLogger = new ChangingStatusLogger(LOGGER);
    private final AtomicInteger topicCount = new AtomicInteger();
    private final AtomicInteger entityChangedQueueSize = new AtomicInteger();
    private final AtomicInteger entityCreateQueueSize = new AtomicInteger();
    private final LoggingStatus logStatus = new LoggingStatus(this::checkWorkers);
    private boolean enabledMqtt = false;
    private boolean shutdown = false;

    public MqttManager(CoreSettings settings) {
        if (settings == null) {
            throw new IllegalArgumentException("setting must be non-null");
        }
        this.settings = settings;
        this.subscriptionFactory = new SubscriptionFactory(settings);
        this.init();
    }

    private void init() {
        ModelRegistry modelRegistry = this.settings.getModelRegistry();
        for (EntityType entityType : modelRegistry.getEntityTypes()) {
            this.subscriptions.put(entityType, new SubscriptionManager(entityType, this, this.topicCount));
        }
        MqttSettings mqttSettings = this.settings.getMqttSettings();
        if (mqttSettings.isEnableMqtt()) {
            this.enabledMqtt = true;
            this.shutdown = false;
            this.entityChangedEventQueue = new ArrayBlockingQueue<EntityChangedMessage>(mqttSettings.getSubscribeMessageQueueSize());
            this.entityChangedExecutorService = ProcessorHelper.createProcessors(mqttSettings.getSubscribeThreadPoolSize(), this.entityChangedEventQueue, this::handleEntityChangedEvent, "Mqtt-EntityChangedProcessor", this.entityChangedProcessors);
            this.entityCreateEventQueue = new ArrayBlockingQueue<EntityCreateEvent>(mqttSettings.getCreateMessageQueueSize());
            this.entityCreateExecutorService = ProcessorHelper.createProcessors(mqttSettings.getCreateThreadPoolSize(), this.entityCreateEventQueue, this::handleEntityCreateEvent, "Mqtt-EntityCreateProcessor", this.entityCreateProcessors);
            this.server = MqttServerFactory.get(this.settings);
            this.server.addSubscriptionListener(this);
            this.server.addEntityCreateListener(this);
            this.server.start();
            long queueLoggingInterval = this.settings.getSettings().getInt("queueLoggingInterval", CoreSettings.class);
            if (queueLoggingInterval > 0L) {
                this.statusLogger.setLogIntervalMs(queueLoggingInterval).addLogStatus(this.logStatus).start();
            }
        } else {
            this.enabledMqtt = false;
            this.entityChangedExecutorService = null;
            this.entityChangedEventQueue = new ArrayBlockingQueue<EntityChangedMessage>(1);
            this.entityCreateExecutorService = null;
            this.entityCreateEventQueue = new ArrayBlockingQueue<EntityCreateEvent>(1);
            this.server = null;
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.statusLogger.stop();
        ProcessorHelper.shutdownProcessors(this.entityChangedExecutorService, this.entityChangedEventQueue, 10L, TimeUnit.SECONDS);
        ProcessorHelper.shutdownProcessors(this.entityCreateExecutorService, this.entityCreateEventQueue, 10L, TimeUnit.SECONDS);
        if (this.server != null) {
            this.server.stop();
        }
    }

    private void handleEntityChangedEvent(EntityChangedMessage message) {
        this.logStatus.setEntityChangedQueueSize(this.entityChangedQueueSize.decrementAndGet());
        EntityChangedMessage.Type eventType = message.getEventType();
        EntityType entityType = message.getEntityType();
        LOGGER.trace("Received a {} message for a {}.", (Object)eventType, (Object)entityType);
        if (eventType == EntityChangedMessage.Type.DELETE) {
            return;
        }
        if (!this.subscriptions.containsKey(entityType)) {
            return;
        }
        Entity entity = message.getEntity();
        Set<Property> fields2 = message.getFields();
        try (PersistenceManager persistenceManager = PersistenceManagerFactory.getInstance(this.settings).create();){
            this.subscriptions.get(entityType).handleEntityChanged(persistenceManager, entity, fields2);
        }
        catch (Exception ex) {
            LOGGER.error("error handling MQTT subscriptions", ex);
        }
    }

    public void notifySubscription(Subscription subscription, Entity entity) {
        String topic = subscription.getTopic();
        try {
            String payload = subscription.formatMessage(entity);
            this.server.publish(topic, payload, this.settings.getMqttSettings().getQosLevel());
        }
        catch (IOException ex) {
            LOGGER.error("publishing to MQTT on topic '{}' failed", (Object)topic, (Object)ex);
        }
    }

    private void handleEntityCreateEvent(EntityCreateEvent e) {
        Version version;
        this.logStatus.setEntityCreateQueueSize(this.entityCreateQueueSize.decrementAndGet());
        String topic = e.getTopic();
        try {
            version = MqttManager.getVersionFromTopic(this.settings, topic);
        }
        catch (UnknownVersionException ex) {
            LOGGER.info("received message on topic '{}' which contains no version info.", (Object)topic);
            return;
        }
        String url = topic.replaceFirst(version.urlPart, "");
        try (Service service = new Service(this.settings);){
            ServiceResponseDefault serviceResponse = new ServiceResponseDefault();
            ServiceRequest serviceRequest = new ServiceRequest().setCoreSettings(this.settings).setVersion(version).setRequestType("create").setUpdateMode(UpdateMode.INSERT_STA_11).setContent(e.getPayload()).setUrlPath(url).setUserPrincipal(e.getPrincipal());
            ServiceRequest.setLocalRequest(serviceRequest);
            service.execute(serviceRequest, serviceResponse);
            ServiceRequest.removeLocalRequest();
            if (!serviceResponse.isSuccessful()) {
                LOGGER.error("Creating entity via MQTT failed (topic: {}, payload: {}, code: {}, message: {})", topic, e.getPayload(), serviceResponse.getCode(), serviceResponse.getMessage());
            }
        }
    }

    private void entityChanged(EntityChangedMessage e) {
        if (this.shutdown || !this.enabledMqtt) {
            return;
        }
        if (this.entityChangedEventQueue.offer(e)) {
            this.logStatus.setEntityChangedQueueSize(this.entityChangedQueueSize.incrementAndGet());
        } else {
            LOGGER.warn("EntityChangedevent discarded because message queue is full {}! Increase mqtt.SubscribeMessageQueueSize and/or mqtt.SubscribeThreadPoolSize.", (Object)this.entityChangedEventQueue.size());
        }
    }

    @Override
    public void onSubscribe(SubscriptionEvent e) {
        Subscription subscription = this.subscriptionFactory.get(e.getTopic());
        if (subscription == null) {
            return;
        }
        this.subscriptions.get(subscription.getEntityType()).addSubscription(subscription);
        this.logStatus.setTopicCount(this.topicCount.get());
        MqttManager.fireTestSubscriptionAdded(e);
    }

    @Override
    public void onUnsubscribe(SubscriptionEvent e) {
        Subscription subscription = this.subscriptionFactory.get(e.getTopic());
        if (subscription == null) {
            return;
        }
        this.subscriptions.get(subscription.getEntityType()).removeSubscription(subscription);
        this.logStatus.setTopicCount(this.topicCount.get());
    }

    @Override
    public void messageReceived(EntityChangedMessage message) {
        this.entityChanged(message);
    }

    @Override
    public void onEntityCreate(EntityCreateEvent e) {
        if (this.shutdown || !this.enabledMqtt) {
            return;
        }
        if (this.entityCreateEventQueue.offer(e)) {
            this.logStatus.setEntityCreateQueueSize(this.entityCreateQueueSize.incrementAndGet());
        } else {
            LOGGER.warn("EntityCreateEvent discarded because message queue is full {}! Increase mqtt.SubscribeMessageQueueSize and/or mqtt.SubscribeThreadPoolSize", (Object)this.entityCreateEventQueue.size());
        }
    }

    private void checkWorkers() {
        Instant threshold = Instant.now().minus(2L, ChronoUnit.SECONDS);
        ProcessorHelper.ProcessorListStatus cngStatus = ProcessorHelper.checkStatus(this.entityChangedProcessors, threshold);
        ProcessorHelper.ProcessorListStatus crtStatus = ProcessorHelper.checkStatus(this.entityCreateProcessors, threshold);
        this.logStatus.setEntityChangedWaiting(cngStatus.countWaiting()).setEntityChangedWorking(cngStatus.countWorking()).setEntityChangedBad(cngStatus.countBroken()).setEntityCreateWaiting(crtStatus.countWaiting()).setEntityCreateWorking(crtStatus.countWorking()).setEntityCreateBad(crtStatus.countBroken());
    }

    public static Version getVersionFromTopic(CoreSettings settings, String topic) throws UnknownVersionException {
        int pos = topic.indexOf(47);
        if (pos == -1) {
            throw new UnknownVersionException("Could not find version in topic " + topic);
        }
        String versionString = topic.substring(0, pos);
        Version version = settings.getPluginManager().getVersion(versionString);
        if (version == null) {
            throw new UnknownVersionException("Could not find version in topic " + topic);
        }
        return version;
    }

    private static void fireTestSubscriptionAdded(SubscriptionEvent s2) {
        if (testListeners == null) {
            return;
        }
        for (SubscriptionListener l : testListeners) {
            l.onSubscribe(s2);
        }
    }

    public static void addTestSubscriptionListener(SubscriptionListener l) {
        if (testListeners == null) {
            testListeners = new ArrayList<SubscriptionListener>();
        }
        testListeners.add(l);
    }

    public static void removeTestSubscriptionListener(SubscriptionListener l) {
        if (testListeners == null) {
            return;
        }
        testListeners.remove(l);
    }

    public static void clearTestSubscriptionListeners() {
        testListeners = null;
    }

    private static class LoggingStatus
    extends ChangingStatusLogger.ChangingStatusDefault {
        public static final String MESSAGE = "entityCreateQueue: {} [{}, {}, {}] entityChangedQueue: {} [{}, {}, {}] topics: {}";
        public final Object[] status = this.getCurrentParams();
        private final Runnable processor;

        public LoggingStatus(Runnable processor) {
            super(MESSAGE, new Object[9]);
            Arrays.setAll(this.status, i -> 0);
            this.processor = processor;
        }

        @Override
        public void process() {
            this.processor.run();
        }

        public LoggingStatus setEntityCreateQueueSize(Integer size) {
            this.status[0] = size;
            return this;
        }

        public LoggingStatus setEntityCreateWaiting(Integer size) {
            this.status[1] = size;
            return this;
        }

        public LoggingStatus setEntityCreateWorking(Integer size) {
            this.status[2] = size;
            return this;
        }

        public LoggingStatus setEntityCreateBad(Integer size) {
            this.status[3] = size;
            return this;
        }

        public LoggingStatus setEntityChangedQueueSize(Integer size) {
            this.status[4] = size;
            return this;
        }

        public LoggingStatus setEntityChangedWaiting(Integer size) {
            this.status[5] = size;
            return this;
        }

        public LoggingStatus setEntityChangedWorking(Integer size) {
            this.status[6] = size;
            return this;
        }

        public LoggingStatus setEntityChangedBad(Integer size) {
            this.status[7] = size;
            return this;
        }

        public LoggingStatus setTopicCount(Integer count) {
            this.status[8] = count;
            return this;
        }
    }
}

