/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.mqtt;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.security.AuthContext;
import org.openremote.manager.mqtt.MQTTBrokerService;
import org.openremote.manager.mqtt.MQTTHandler;
import org.openremote.manager.mqtt.Topic;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.AssetEvent;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.event.Event;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.ValueUtil;

public class DefaultMQTTHandler
extends MQTTHandler {
    public static final int PRIORITY = -2147482648;
    public static final String ASSET_TOPIC = "asset";
    public static final String ATTRIBUTE_TOPIC = "attribute";
    public static final String ATTRIBUTE_VALUE_TOPIC = "attributevalue";
    public static final String ATTRIBUTE_VALUE_WRITE_TOPIC = "writeattributevalue";
    public static final String ATTRIBUTE_WRITE_TOPIC = "writeattribute";
    private static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.API, DefaultMQTTHandler.class);
    protected final Map<String, Map<String, Consumer<? extends Event>>> sessionSubscriptionConsumers = new HashMap<String, Map<String, Consumer<? extends Event>>>();
    protected final Cache<String, ConcurrentHashSet<String>> authorizationCache = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterWrite(300000L, TimeUnit.MILLISECONDS).build();
    protected final Cache<String, EventSubscription<?>> eventSubscriptionCache = CacheBuilder.newBuilder().expireAfterWrite(30000L, TimeUnit.MILLISECONDS).build();

    @Override
    public int getPriority() {
        return -2147482648;
    }

    @Override
    public void onConnect(RemotingConnection connection) {
        super.onConnect(connection);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onDisconnect(RemotingConnection connection) {
        super.onDisconnect(connection);
        String sessionKey = DefaultMQTTHandler.getSessionKey(connection);
        LOG.log(Level.FINER, "Removing subscriptions for connection: " + MQTTBrokerService.connectionToString(connection));
        Map<String, Map<String, Consumer<? extends Event>>> map = this.sessionSubscriptionConsumers;
        synchronized (map) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (s, subscriptionConsumers) -> {
                subscriptionConsumers.forEach((subscriptionKey, consumer) -> this.clientEventService.removeSubscription((Consumer<? extends Event>)consumer));
                return null;
            });
        }
        this.authorizationCache.invalidate((Object)MQTTBrokerService.getConnectionIDString(connection));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onConnectionLost(RemotingConnection connection) {
        super.onConnectionLost(connection);
        String sessionKey = DefaultMQTTHandler.getSessionKey(connection);
        LOG.log(Level.FINER, "Removing subscriptions for connection: " + MQTTBrokerService.connectionToString(connection));
        Map<String, Map<String, Consumer<? extends Event>>> map = this.sessionSubscriptionConsumers;
        synchronized (map) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (s, subscriptionConsumers) -> {
                subscriptionConsumers.forEach((subscriptionKey, consumer) -> this.clientEventService.removeSubscription((Consumer<? extends Event>)consumer));
                return null;
            });
        }
        this.authorizationCache.invalidate((Object)MQTTBrokerService.getConnectionIDString(connection));
    }

    @Override
    public boolean topicMatches(Topic topic) {
        return DefaultMQTTHandler.isAttributeTopic(topic) || DefaultMQTTHandler.isAssetTopic(topic) || DefaultMQTTHandler.isAttributeValueWriteTopic(topic) || DefaultMQTTHandler.isAttributeWriteTopic(topic);
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    @Override
    public boolean canSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        AssetFilter<?> filter;
        if (!this.isKeycloak) {
            LOG.finest("Identity provider is not keycloak");
            return false;
        }
        AuthContext authContext = DefaultMQTTHandler.getAuthContextFromSecurityContext(securityContext);
        if (authContext == null) {
            LOG.finest("Anonymous connection not supported: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        boolean isAttributeTopic = DefaultMQTTHandler.isAttributeTopic(topic);
        boolean isAssetTopic = DefaultMQTTHandler.isAssetTopic(topic);
        if (!isAssetTopic && !isAttributeTopic) {
            LOG.finest("Topic must have 3 or more tokens and third token must be 'asset, attribute or attributevalue': topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        if (isAssetTopic) {
            if (topic.getTokens().size() < 4 || topic.getTokens().size() > 5) {
                LOG.finest("Asset subscribe token count should be 4 or 5: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                return false;
            }
            if (topic.getTokens().size() == 4) {
                if (!(Pattern.matches("^[0-9A-Za-z]{22}$", DefaultMQTTHandler.topicTokenIndexToString(topic, 3)) || "#".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 3)) || "+".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 3)))) {
                    LOG.fine("Asset subscribe forth token must be an asset ID or wildcard: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
            } else if (topic.getTokens().size() == 5) {
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", DefaultMQTTHandler.topicTokenIndexToString(topic, 3))) {
                    LOG.fine("Asset subscribe forth token must be an asset ID: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
                if (!"#".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 4)) && !"+".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 4))) {
                    LOG.fine("Asset subscribe fifth token must be a wildcard: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
            }
        } else {
            if (topic.getTokens().size() < 5 || topic.getTokens().size() > 6) {
                LOG.fine("Attribute subscribe token count should be 5 or 6: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                return false;
            }
            if (topic.getTokens().size() == 5) {
                if ("#".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 3))) {
                    LOG.fine("Attribute subscribe multi level wildcard must be last token: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
                if (!(Pattern.matches("^[0-9A-Za-z]{22}$", DefaultMQTTHandler.topicTokenIndexToString(topic, 4)) || "#".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 4)) || "+".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 4)))) {
                    LOG.fine("Attribute subscribe fifth token must be an asset ID or a wildcard: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
            } else if (topic.getTokens().size() == 6) {
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", DefaultMQTTHandler.topicTokenIndexToString(topic, 4))) {
                    LOG.fine("Attribute subscribe fifth token must be an asset ID: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
                if (!"#".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 5)) && !"+".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 5))) {
                    LOG.fine("Attribute subscribe sixth token must be a wildcard: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
                    return false;
                }
            }
        }
        if ((filter = DefaultMQTTHandler.buildAssetFilter(topic)) == null) {
            LOG.info("Failed to process subscription topic: topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        EventSubscription subscription = new EventSubscription(isAssetTopic ? AssetEvent.class : AttributeEvent.class, filter);
        if (!this.clientEventService.authorizeEventSubscription(DefaultMQTTHandler.topicRealm(topic), authContext, subscription)) {
            return false;
        }
        String subscriptionId = topic.getString() + authContext.getUserId();
        this.eventSubscriptionCache.put((Object)subscriptionId, (Object)subscription);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        ConcurrentHashSet set;
        if (!this.isKeycloak) {
            LOG.fine("Identity provider is not keycloak");
            return false;
        }
        AuthContext authContext = DefaultMQTTHandler.getAuthContextFromSecurityContext(securityContext);
        if (authContext == null) {
            LOG.finer("Anonymous publish not supported: topic=" + String.valueOf(topic) + ", connection=" + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        if (DefaultMQTTHandler.isAttributeValueWriteTopic(topic) || DefaultMQTTHandler.isAttributeWriteTopic(topic)) {
            if (topic.getTokens().size() != 5 || !Pattern.matches("^[0-9A-Za-z]{22}$", DefaultMQTTHandler.topicTokenIndexToString(topic, 4))) {
                LOG.finer("Invalid publish topic: topic=" + String.valueOf(topic) + ", connection=" + MQTTBrokerService.connectionToString(connection));
                return false;
            }
        } else {
            return false;
        }
        String cacheKey = MQTTBrokerService.getConnectionIDString(connection);
        ConcurrentHashSet act = (ConcurrentHashSet)this.authorizationCache.getIfPresent((Object)cacheKey);
        if (act != null && act.contains((Object)topic.getString())) {
            return true;
        }
        if (!this.clientEventService.authorizeEventWrite(DefaultMQTTHandler.topicRealm(topic), authContext, DefaultMQTTHandler.buildAttributeEvent(topic.getTokens(), null, null))) {
            LOG.fine("Publish was not authorised for this user and topic: topic=" + String.valueOf(topic) + ", subject=" + String.valueOf(authContext));
            return false;
        }
        Cache<String, ConcurrentHashSet<String>> cache = this.authorizationCache;
        synchronized (cache) {
            act = (ConcurrentHashSet)this.authorizationCache.getIfPresent((Object)cacheKey);
            if (act != null) {
                set = act;
            } else {
                set = new ConcurrentHashSet();
                this.authorizationCache.put((Object)cacheKey, (Object)set);
            }
        }
        set.add((Object)topic.getString());
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscribe(RemotingConnection connection, Topic topic) {
        AuthContext authContext = DefaultMQTTHandler.getAuthContextFromConnection(connection).get();
        String subscriptionId = topic.getString() + authContext.getUserId();
        EventSubscription subscription = (EventSubscription)this.eventSubscriptionCache.getIfPresent((Object)subscriptionId);
        if (subscription == null) {
            LOG.info("Subscription not found in intermediary cache: " + subscriptionId);
            return;
        }
        this.eventSubscriptionCache.invalidate((Object)subscriptionId);
        String sessionKey = DefaultMQTTHandler.getSessionKey(connection);
        Consumer consumer = this.getSubscriptionEventConsumer(connection, topic);
        Map<String, Map<String, Consumer<? extends Event>>> map = this.sessionSubscriptionConsumers;
        synchronized (map) {
            Map subscriptionConsumers = this.sessionSubscriptionConsumers.computeIfAbsent(sessionKey, s -> new HashMap());
            subscriptionConsumers.put(topic.getString(), consumer);
            this.clientEventService.addSubscription((EventSubscription<? extends Event>)subscription, consumer);
            LOG.finest(() -> "Client event subscription created for topic '" + String.valueOf(topic) + "': " + MQTTBrokerService.connectionToString(connection));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onUnsubscribe(RemotingConnection connection, Topic topic) {
        String subscriptionId = topic.toString();
        String sessionKey = DefaultMQTTHandler.getSessionKey(connection);
        Map<String, Map<String, Consumer<? extends Event>>> map = this.sessionSubscriptionConsumers;
        synchronized (map) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (connectionID, subscriptionConsumers) -> {
                Consumer consumer = (Consumer)subscriptionConsumers.remove(subscriptionId);
                if (consumer != null) {
                    this.clientEventService.removeSubscription(consumer);
                }
                if (subscriptionConsumers.isEmpty()) {
                    return null;
                }
                return subscriptionConsumers;
            });
        }
    }

    @Override
    public Set<String> getPublishListenerTopics() {
        return Set.of("+/+/writeattributevalue/#", "+/+/writeattribute/#");
    }

    @Override
    public void onPublish(RemotingConnection connection, Topic topic, ByteBuf body) {
        AttributeEvent attributeEvent;
        List<String> topicTokens = topic.getTokens();
        String payloadContent = body.toString(StandardCharsets.UTF_8);
        if (DefaultMQTTHandler.isAttributeWriteTopic(topic)) {
            attributeEvent = ValueUtil.parse((String)payloadContent, ObjectNode.class).map(valueWithTimestamp -> {
                if (valueWithTimestamp.has("value") && valueWithTimestamp.has("timestamp")) {
                    JsonNode value = valueWithTimestamp.get("value");
                    long timestamp = valueWithTimestamp.get("timestamp").asLong();
                    if (timestamp > 0L) {
                        return DefaultMQTTHandler.buildAttributeEvent(topicTokens, value, timestamp);
                    }
                }
                return null;
            }).orElse(null);
            if (attributeEvent == null) {
                LOG.info(() -> "Invalid publish to write attribute topic '" + String.valueOf(topic) + "': " + MQTTBrokerService.connectionToString(connection));
            }
        } else {
            Object value = ValueUtil.parse((String)payloadContent).orElse(null);
            attributeEvent = DefaultMQTTHandler.buildAttributeEvent(topicTokens, value, this.timerService.getCurrentTimeMillis());
        }
        if (attributeEvent != null) {
            this.messageBrokerService.getFluentProducerTemplate().withBody((Object)attributeEvent).to("direct://AttributeEventProcessor").asyncSend();
        }
    }

    @Override
    public void onUserAssetLinksChanged(RemotingConnection connection, List<PersistenceEvent<UserAssetLink>> changes) {
        String sessionKey = DefaultMQTTHandler.getSessionKey(connection);
        if (this.sessionSubscriptionConsumers.containsKey(sessionKey)) {
            if (changes.stream().allMatch(pe -> pe.getCause() == PersistenceEvent.Cause.CREATE)) {
                return;
            }
            LOG.info("User asset links have changed for a connected user with active subscriptions so force disconnecting them: " + MQTTBrokerService.connectionToString(connection));
            this.mqttBrokerService.doForceDisconnect(connection);
        }
    }

    protected static AttributeEvent buildAttributeEvent(List<String> topicTokens, Object value, Long timestamp) {
        String attributeName = topicTokens.get(3);
        String assetId = topicTokens.get(4);
        return new AttributeEvent(assetId, attributeName, value, timestamp).setSource(DefaultMQTTHandler.class.getSimpleName());
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected static AssetFilter<?> buildAssetFilter(Topic topic) {
        boolean isAssetTopic = DefaultMQTTHandler.isAssetTopic(topic);
        String realm = DefaultMQTTHandler.topicRealm(topic);
        ArrayList<String> assetIds = new ArrayList<String>();
        ArrayList<String> parentIds = new ArrayList<String>();
        ArrayList<String> paths = new ArrayList<String>();
        ArrayList<String> attributeNames = new ArrayList<String>();
        String firstTokenStr = DefaultMQTTHandler.topicTokenIndexToString(topic, 3);
        if (isAssetTopic) {
            if (topic.getTokens().size() == 4) {
                if (!"#".equals(firstTokenStr)) {
                    if ("+".equals(firstTokenStr)) {
                        parentIds.add(null);
                    } else {
                        assetIds.add(firstTokenStr);
                    }
                }
            } else {
                if (topic.getTokens().size() != 5) return null;
                secondTokenStr = DefaultMQTTHandler.topicTokenIndexToString(topic, 4);
                if ("#".equals(secondTokenStr)) {
                    paths.add(firstTokenStr);
                } else if ("+".equals(secondTokenStr)) {
                    parentIds.add(firstTokenStr);
                }
            }
        } else {
            if (!"+".equals(firstTokenStr)) {
                attributeNames.add(firstTokenStr);
            }
            if (topic.getTokens().size() == 5) {
                secondTokenStr = DefaultMQTTHandler.topicTokenIndexToString(topic, 4);
                if (!"#".equals(secondTokenStr)) {
                    if ("+".equals(secondTokenStr)) {
                        parentIds.add(null);
                    } else {
                        assetIds.add(secondTokenStr);
                    }
                }
            } else {
                if (topic.getTokens().size() != 6) return null;
                String thirdTokenStr = DefaultMQTTHandler.topicTokenIndexToString(topic, 5);
                if ("#".equals(thirdTokenStr)) {
                    paths.add(DefaultMQTTHandler.topicTokenIndexToString(topic, 4));
                } else if ("+".equals(thirdTokenStr)) {
                    parentIds.add(DefaultMQTTHandler.topicTokenIndexToString(topic, 4));
                }
            }
        }
        AssetFilter assetFilter = new AssetFilter().setRealm(realm).setValueChanged(true);
        if (!assetIds.isEmpty()) {
            assetFilter.setAssetIds(assetIds.toArray(new String[0]));
        }
        if (!parentIds.isEmpty()) {
            assetFilter.setParentIds(parentIds.toArray(new String[0]));
        }
        if (!paths.isEmpty()) {
            assetFilter.setPath(paths.toArray(new String[0]));
        }
        if (attributeNames.isEmpty()) return assetFilter.setValueChanged(true);
        assetFilter.setAttributeNames(attributeNames.toArray(new String[0]));
        return assetFilter.setValueChanged(true);
    }

    protected <T extends Event> Consumer<T> getSubscriptionEventConsumer(RemotingConnection connection, Topic topic) {
        Function<Event, String> topicExpander;
        boolean isValueSubscription = ATTRIBUTE_VALUE_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2));
        boolean isAssetTopic = DefaultMQTTHandler.isAssetTopic(topic);
        MqttQoS mqttQoS = MqttQoS.AT_MOST_ONCE;
        if (isAssetTopic) {
            String topicStr = topic.toString();
            String replaceToken = topicStr.endsWith("#") ? "#" : (topicStr.endsWith("+") ? "+" : null);
            topicExpander = ev -> replaceToken != null ? topicStr.replace(replaceToken, ((AssetEvent)ev).getId()) : topicStr;
        } else {
            String topicStr = topic.toString();
            boolean injectAttributeName = "+".equals(DefaultMQTTHandler.topicTokenIndexToString(topic, 3));
            if (injectAttributeName) {
                topicStr = topicStr.replaceFirst("\\+", "\\$");
            }
            String replaceToken = topicStr.endsWith("#") ? "#" : (topicStr.endsWith("+") ? "+" : null);
            String finalTopicStr = topicStr;
            topicExpander = ev -> {
                String expanded;
                String string = expanded = replaceToken != null ? finalTopicStr.replace(replaceToken, ((AttributeEvent)ev).getId()) : finalTopicStr;
                if (injectAttributeName) {
                    expanded = expanded.replace("$", ((AttributeEvent)ev).getName());
                }
                return expanded;
            };
        }
        return ev -> {
            if (isAssetTopic) {
                if (ev instanceof AssetEvent) {
                    this.publishMessage((String)topicExpander.apply((Event)ev), ev, mqttQoS);
                }
            } else if (ev instanceof AttributeEvent) {
                AttributeEvent attributeEvent = (AttributeEvent)ev;
                if (isValueSubscription) {
                    this.publishMessage((String)topicExpander.apply((Event)ev), attributeEvent.getValue().orElse(null), mqttQoS);
                } else {
                    this.publishMessage((String)topicExpander.apply((Event)ev), ev, mqttQoS);
                }
            }
        };
    }

    protected static boolean isAttributeTopic(Topic topic) {
        return ATTRIBUTE_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2)) || ATTRIBUTE_VALUE_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAttributeValueWriteTopic(Topic topic) {
        return ATTRIBUTE_VALUE_WRITE_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAttributeWriteTopic(Topic topic) {
        return ATTRIBUTE_WRITE_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAssetTopic(Topic topic) {
        return ASSET_TOPIC.equalsIgnoreCase(DefaultMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected static String getSessionKey(RemotingConnection connection) {
        return MQTTBrokerService.getConnectionIDString(connection);
    }
}

