/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.mqtt;

import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ExpressionNode;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.security.keycloak.KeycloakIdentityProvider;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.mqtt.MQTTBrokerService;
import org.openremote.manager.mqtt.MQTTHandler;
import org.openremote.manager.mqtt.Topic;
import org.openremote.manager.security.ManagerIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.attribute.Attribute;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.attribute.AttributeMap;
import org.openremote.model.attribute.AttributeRef;
import org.openremote.model.query.AssetQuery;
import org.openremote.model.query.filter.AttributePredicate;
import org.openremote.model.query.filter.NameValuePredicate;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.Pair;
import org.openremote.model.value.MetaItemType;
import org.openremote.model.value.NameHolder;
import org.openremote.model.value.ValueHolder;
import org.openremote.model.value.ValueType;

public class ConnectionMonitorHandler
extends MQTTHandler {
    protected static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.API, ConnectionMonitorHandler.class);
    protected MQTTBrokerService mqttBrokerService;
    protected ExecutorService executorService;
    protected AssetStorageService assetStorageService;
    protected AssetProcessingService assetProcessingService;
    protected GatewayService gatewayService;
    protected PersistenceService persistenceService;
    protected ConcurrentMap<String, Set<AttributeRef>> userIDAttributeRefs = new ConcurrentHashMap<String, Set<AttributeRef>>();

    @Override
    public void init(Container container, Configuration serverConfiguration) throws Exception {
        super.init(container, serverConfiguration);
        this.executorService = container.getExecutor();
        this.mqttBrokerService = (MQTTBrokerService)container.getService(MQTTBrokerService.class);
        this.assetStorageService = (AssetStorageService)container.getService(AssetStorageService.class);
        this.assetProcessingService = (AssetProcessingService)container.getService(AssetProcessingService.class);
        this.gatewayService = (GatewayService)container.getService(GatewayService.class);
        this.persistenceService = (PersistenceService)container.getService(PersistenceService.class);
        MessageBrokerService messageBrokerService = (MessageBrokerService)container.getService(MessageBrokerService.class);
        messageBrokerService.getContext().addRoutes((RoutesBuilder)new RouteBuilder(){

            public void configure() throws Exception {
                ((ExpressionNode)this.from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-MQTTConnectedAttributes").filter(PersistenceService.isPersistenceEventForEntityType(Asset.class)).filter(GatewayService.isNotForGateway(ConnectionMonitorHandler.this.gatewayService)).process(exchange -> {
                    PersistenceEvent persistenceEvent = (PersistenceEvent)exchange.getIn().getBody(PersistenceEvent.class);
                    if (persistenceEvent.hasPropertyChanged("attributes")) {
                        Asset asset = (Asset)persistenceEvent.getEntity();
                        AttributeMap oldAttributes = (AttributeMap)persistenceEvent.getPreviousState("attributes");
                        AttributeMap newAttributes = (AttributeMap)persistenceEvent.getCurrentState("attributes");
                        if (oldAttributes != null) {
                            oldAttributes.stream().filter(ConnectionMonitorHandler::attributeMatches).forEach(attr -> attr.getMetaItem(MetaItemType.USER_CONNECTED).flatMap(ValueHolder::getValue).ifPresent(userID -> ConnectionMonitorHandler.this.removeSessionAttribute((String)userID, new AttributeRef(asset.getId(), attr.getName()))));
                        }
                        if (newAttributes != null) {
                            List<Pair<String, Attribute<?>>> connectedAttributes = newAttributes.stream().filter(ConnectionMonitorHandler::attributeMatches).map(attr -> new Pair((Object)asset.getId(), attr)).toList();
                            ConnectionMonitorHandler.this.addSessionAttributes(asset.getRealm(), connectedAttributes);
                        }
                    }
                })).end();
            }
        });
    }

    @Override
    public void start(Container container) throws Exception {
        this.executorService.submit(() -> {
            List<Asset<?>> assets = this.assetStorageService.findAll(new AssetQuery().attributes(new AttributePredicate[]{new AttributePredicate().meta(new NameValuePredicate[]{new NameValuePredicate((NameHolder)MetaItemType.USER_CONNECTED, null)})}));
            Map<String, List<Asset>> realmAttributeMap = assets.stream().collect(Collectors.groupingBy(Asset::getRealm));
            realmAttributeMap.forEach((realm, realmAssets) -> {
                List<Pair<String, Attribute<?>>> assetIdsAttrs = realmAssets.stream().flatMap(asset -> asset.getAttributes().stream().filter(ConnectionMonitorHandler::attributeMatches).map(attr -> new Pair((Object)asset.getId(), attr))).toList();
                this.addSessionAttributes((String)realm, assetIdsAttrs);
            });
        });
    }

    @Override
    public void stop() throws Exception {
        super.stop();
    }

    @Override
    public void onConnect(RemotingConnection connection) {
        super.onConnect(connection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = this.getUserIDAndAttributeRefs(connection);
        if (userIDAndAttributeRefs != null) {
            this.updateUserConnectedStatus((String)userIDAndAttributeRefs.key, (Collection)userIDAndAttributeRefs.value, true);
        }
    }

    @Override
    public void onDisconnect(RemotingConnection connection) {
        super.onDisconnect(connection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = this.getUserIDAndAttributeRefs(connection);
        if (userIDAndAttributeRefs != null) {
            this.updateUserConnectedStatus((String)userIDAndAttributeRefs.key, (Collection)userIDAndAttributeRefs.value, false);
        }
    }

    @Override
    public void onConnectionLost(RemotingConnection connection) {
        super.onConnectionLost(connection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = this.getUserIDAndAttributeRefs(connection);
        if (userIDAndAttributeRefs != null) {
            this.updateUserConnectedStatus((String)userIDAndAttributeRefs.key, (Collection)userIDAndAttributeRefs.value, false);
        }
    }

    @Override
    public void onConnectionAuthenticated(RemotingConnection connection) {
        super.onConnectionAuthenticated(connection);
        Pair<String, Set<AttributeRef>> userIDAndAttributeRefs = this.getUserIDAndAttributeRefs(connection);
        if (userIDAndAttributeRefs != null) {
            this.updateUserConnectedStatus((String)userIDAndAttributeRefs.key, (Collection)userIDAndAttributeRefs.value, true);
        }
    }

    @Override
    protected boolean topicMatches(Topic topic) {
        return false;
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    @Override
    public boolean canSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        return false;
    }

    @Override
    public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        return false;
    }

    @Override
    public Set<String> getPublishListenerTopics() {
        return null;
    }

    @Override
    public void onPublish(RemotingConnection connection, Topic topic, ByteBuf body) {
    }

    @Override
    public void onSubscribe(RemotingConnection connection, Topic topic) {
    }

    @Override
    public void onUnsubscribe(RemotingConnection connection, Topic topic) {
    }

    protected void addSessionAttributes(String realm, List<Pair<String, Attribute<?>>> assetIdsAttrs) {
        LOG.finest("Adding '" + assetIdsAttrs.size() + "' attributes(s) with user linked attributes in realm: " + realm);
        List<String> usernames = assetIdsAttrs.stream().map(assetIdAttr -> ((Attribute)assetIdAttr.getValue()).getMetaValue(MetaItemType.USER_CONNECTED).orElse(null)).filter(Objects::nonNull).distinct().map(username -> username.startsWith("service-account-") ? username : "service-account-" + username).toList();
        List<String> userIds = ManagerIdentityProvider.getUserIds(this.persistenceService, realm, usernames);
        assetIdsAttrs.forEach(assetIdAttr -> ((Attribute)assetIdAttr.getValue()).getMetaValue(MetaItemType.USER_CONNECTED).ifPresent(username -> {
            String userID = (String)userIds.get(usernames.indexOf("service-account-" + username));
            if (userID == null) {
                LOG.warning("Invalid username so skipping add session attributes: realm=" + realm + ", username=" + username);
            } else {
                this.addSessionAttribute(userID, new AttributeRef((String)assetIdAttr.key, ((Attribute)assetIdAttr.getValue()).getName()));
            }
        }));
    }

    protected void addSessionAttribute(String userID, AttributeRef attributeRef) {
        LOG.finest("Adding userID '" + userID + "' monitoring for attribute: " + String.valueOf(attributeRef));
        this.updateUserConnectedStatus(userID, Collections.singletonList(attributeRef), !this.mqttBrokerService.getUserConnections(userID).isEmpty());
        Set refs = this.userIDAttributeRefs.computeIfAbsent(userID, ID -> ConcurrentHashMap.newKeySet());
        refs.add(attributeRef);
    }

    protected void removeSessionAttribute(String userID, AttributeRef attributeRef) {
        LOG.finest("Removing userID '" + userID + "' monitoring for attribute: " + String.valueOf(attributeRef));
        this.updateUserConnectedStatus(userID, Collections.singletonList(attributeRef), false);
        this.userIDAttributeRefs.computeIfPresent(userID, (ID, refs) -> {
            refs.remove(attributeRef);
            return refs.isEmpty() ? null : refs;
        });
    }

    protected void updateUserConnectedStatus(String userID, Collection<AttributeRef> attributeRefs, boolean connected) {
        Set<RemotingConnection> connections = this.mqttBrokerService.getUserConnections(userID);
        if (connected) {
            if (connections.size() > 1) {
                LOG.finest("Connections already exist for user so skipping status update: " + userID);
                return;
            }
        } else if (!connections.isEmpty()) {
            LOG.finest("Other connections remain for user so skipping status update: " + userID);
            return;
        }
        LOG.fine("Updating connected status for '" + userID + "' on " + attributeRefs.size() + " attribute(s) connected=" + connected);
        attributeRefs.forEach(attributeRef -> this.assetProcessingService.sendAttributeEvent(new AttributeEvent(attributeRef, (Object)connected), this.getClass().getSimpleName()));
    }

    protected Pair<String, Set<AttributeRef>> getUserIDAndAttributeRefs(RemotingConnection connection) {
        String userID = KeycloakIdentityProvider.getSubjectId((Subject)connection.getSubject());
        if (userID == null) {
            if (LOG.isLoggable(Level.FINEST)) {
                LOG.finest("Anonymous connection so cannot determine userID: " + MQTTBrokerService.connectionToString(connection));
            }
            return null;
        }
        Set attributeRefs = (Set)this.userIDAttributeRefs.get(userID);
        if (attributeRefs == null) {
            return null;
        }
        return new Pair((Object)userID, (Object)attributeRefs);
    }

    protected static boolean attributeMatches(Attribute<?> attr) {
        return Objects.equals(attr.getType(), ValueType.BOOLEAN) && attr.hasMeta(MetaItemType.USER_CONNECTED);
    }
}

