/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.security.AuthContext;
import org.openremote.container.security.keycloak.AccessTokenAuthContext;
import org.openremote.container.security.keycloak.KeycloakIdentityProvider;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.mqtt.MQTTBrokerService;
import org.openremote.manager.mqtt.Topic;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.security.ManagerKeycloakIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.util.ValueUtil;

public abstract class MQTTHandler {
    public static final String TOKEN_MULTI_LEVEL_WILDCARD = "#";
    public static final String TOKEN_SINGLE_LEVEL_WILDCARD = "+";
    protected ClientEventService clientEventService;
    protected MQTTBrokerService mqttBrokerService;
    protected MessageBrokerService messageBrokerService;
    protected ManagerKeycloakIdentityProvider identityProvider;
    protected ExecutorService executorService;
    protected TimerService timerService;
    protected boolean isKeycloak;
    protected ClientSession clientSession;
    protected ClientProducer producer;

    public int getPriority() {
        return 0;
    }

    public String getName() {
        return this.getClass().getSimpleName();
    }

    public void init(Container container, Configuration serverConfiguration) throws Exception {
        this.mqttBrokerService = (MQTTBrokerService)container.getService(MQTTBrokerService.class);
        this.clientEventService = (ClientEventService)container.getService(ClientEventService.class);
        this.messageBrokerService = (MessageBrokerService)container.getService(MessageBrokerService.class);
        ManagerIdentityService identityService = (ManagerIdentityService)container.getService(ManagerIdentityService.class);
        this.executorService = container.getExecutor();
        this.timerService = (TimerService)container.getService(TimerService.class);
        if (!identityService.isKeycloakEnabled()) {
            this.getLogger().warning("MQTT connections are not supported when not using Keycloak identity provider");
            this.isKeycloak = false;
        } else {
            this.isKeycloak = true;
            this.identityProvider = (ManagerKeycloakIdentityProvider)identityService.getIdentityProvider();
        }
        this.addPublishTopicServerConfiguration(container, serverConfiguration);
    }

    public void start(Container container) throws Exception {
        this.clientSession = this.mqttBrokerService.createSession();
        this.producer = this.clientSession.createProducer();
        Set<String> publishListenerTopics = this.getPublishListenerTopics();
        if (publishListenerTopics != null) {
            for (String publishListenerTopic : publishListenerTopics) {
                this.addPublishConsumer(publishListenerTopic);
            }
        }
    }

    public void stop() throws Exception {
        if (this.clientSession != null) {
            this.clientSession.close();
            this.clientSession = null;
        }
    }

    protected void addPublishTopicServerConfiguration(Container container, Configuration serverConfiguration) {
        Set<String> publishListenerTopics = this.getPublishListenerTopics();
        if (publishListenerTopics != null) {
            publishListenerTopics.forEach(topic -> {
                String coreTopic = MQTTUtil.getCoreAddressFromMqttTopic((String)topic, (WildcardConfiguration)this.mqttBrokerService.wildcardConfiguration);
                AddressSettings addressSettings = this.getPublishTopicAddressSettings(container, (String)topic);
                if (addressSettings != null) {
                    serverConfiguration.addAddressSetting("(" + coreTopic + ")", addressSettings);
                }
            });
        }
    }

    protected AddressSettings getPublishTopicAddressSettings(Container container, String publishTopic) {
        if (container.getMeterRegistry() != null) {
            return new AddressSettings().setEnableMetrics(true);
        }
        return null;
    }

    protected void addPublishConsumer(String topic) throws Exception {
        try {
            this.getLogger().info("Adding publish consumer for topic '" + topic + "': handler=" + this.getName());
            String coreTopic = MQTTUtil.getCoreAddressFromMqttTopic((String)topic, (WildcardConfiguration)this.mqttBrokerService.wildcardConfiguration);
            this.clientSession.createQueue(QueueConfiguration.of((String)coreTopic).setDurable(Boolean.valueOf(false)).setRoutingType(RoutingType.MULTICAST).setPurgeOnNoConsumers(Boolean.valueOf(true)).setAutoCreateAddress(Boolean.valueOf(true)).setAutoCreated(Boolean.valueOf(true)));
            ClientConsumer consumer = this.clientSession.createConsumer(coreTopic);
            consumer.setMessageHandler(message -> {
                Topic publishTopic = Topic.parse(MQTTUtil.getMqttTopicFromCoreAddress((String)message.getAddress(), (WildcardConfiguration)this.mqttBrokerService.wildcardConfiguration));
                String clientID = message.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
                RemotingConnection connection = this.mqttBrokerService.getConnectionFromClientID(clientID);
                if (connection == null) {
                    this.getLogger().finer(() -> "Client is no longer connected so dropping publish to topic '" + topic + "': clientID=" + clientID);
                    return;
                }
                this.getLogger().finer(() -> "onPublish '" + String.valueOf(publishTopic) + "': " + MQTTBrokerService.connectionToString(connection));
                try {
                    this.onPublish(connection, publishTopic, message.getReadOnlyBodyBuffer().byteBuf());
                }
                catch (Exception e) {
                    this.getLogger().info("An error occurred whilst handling onPublish to topic '" + topic + "': clientID=" + clientID);
                }
            });
        }
        catch (Exception e) {
            this.getLogger().log(Level.SEVERE, "Failed to create handler consumer for topic '" + topic + "': handler=" + this.getName(), e);
            throw e;
        }
    }

    public void onConnect(RemotingConnection connection) {
    }

    public void onDisconnect(RemotingConnection connection) {
    }

    public void onConnectionLost(RemotingConnection connection) {
    }

    public void onConnectionAuthenticated(RemotingConnection connection) {
    }

    public boolean handlesTopic(Topic topic) {
        if (!MQTTHandler.topicTokenCountGreaterThan(topic, 2)) {
            return false;
        }
        return this.topicMatches(topic);
    }

    public boolean checkCanSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        if (securityContext == null) {
            this.getLogger().finest("Anonymous connection subscriptions not supported by this handler, topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        if (!MQTTHandler.topicRealmAllowed(securityContext, topic) || !MQTTHandler.topicClientIdMatches(connection, topic)) {
            this.getLogger().finest("Topic realm and client ID tokens must match the connection, topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        return this.canSubscribe(connection, securityContext, topic);
    }

    public boolean checkCanPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        if (securityContext == null) {
            this.getLogger().finest("Anonymous connection publishes not supported by this handler topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        if (!MQTTHandler.topicRealmAllowed(securityContext, topic) || !MQTTHandler.topicClientIdMatches(connection, topic)) {
            this.getLogger().finest("Topic realm and client ID tokens must match the connection topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        return this.canPublish(connection, securityContext, topic);
    }

    public void onUserAssetLinksChanged(RemotingConnection connection, List<PersistenceEvent<UserAssetLink>> changes) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void publishMessage(String topic, Object data, MqttQoS qoS) {
        block5: {
            try {
                if (this.clientSession == null) break block5;
                ClientSession clientSession = this.clientSession;
                synchronized (clientSession) {
                    ClientMessage message = this.clientSession.createMessage(false);
                    message.putIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, qoS.value());
                    message.writeBodyBufferBytes(ValueUtil.asJSON((Object)data).map(String::getBytes).orElseThrow(() -> new IllegalStateException("Failed to convert payload to JSON string: " + String.valueOf(data))));
                    this.producer.send(MQTTUtil.getCoreAddressFromMqttTopic((String)topic, (WildcardConfiguration)this.mqttBrokerService.getWildcardConfiguration()), (Message)message);
                }
            }
            catch (Exception e) {
                this.getLogger().log(Level.WARNING, "Couldn't publish to MQTT client: topic=" + topic, e);
            }
        }
    }

    protected abstract boolean topicMatches(Topic var1);

    protected abstract Logger getLogger();

    public abstract boolean canSubscribe(RemotingConnection var1, KeycloakSecurityContext var2, Topic var3);

    public abstract boolean canPublish(RemotingConnection var1, KeycloakSecurityContext var2, Topic var3);

    public abstract void onSubscribe(RemotingConnection var1, Topic var2);

    public abstract void onUnsubscribe(RemotingConnection var1, Topic var2);

    public abstract Set<String> getPublishListenerTopics();

    public abstract void onPublish(RemotingConnection var1, Topic var2, ByteBuf var3);

    public static String topicRealm(Topic topic) {
        return MQTTHandler.topicTokenIndexToString(topic, 0);
    }

    public static String topicClientID(Topic topic) {
        return MQTTHandler.topicTokenIndexToString(topic, 1);
    }

    public static boolean topicRealmAllowed(KeycloakSecurityContext securityContext, Topic topic) {
        return securityContext != null && securityContext.getRealm().equals(MQTTHandler.topicRealm(topic)) || KeycloakIdentityProvider.isSuperUser((KeycloakSecurityContext)securityContext);
    }

    public static boolean topicClientIdMatches(RemotingConnection connection, Topic topic) {
        return connection != null && Objects.equals(connection.getClientID(), MQTTHandler.topicTokenIndexToString(topic, 1));
    }

    public static boolean topicTokenCountGreaterThan(Topic topic, int size) {
        return topic.getTokens() != null && topic.getTokens().size() > size;
    }

    public static String topicTokenIndexToString(Topic topic, int tokenNumber) {
        return MQTTHandler.topicTokenCountGreaterThan(topic, tokenNumber) ? topic.getTokens().get(tokenNumber) : null;
    }

    protected static Subject getSubjectFromConnection(RemotingConnection connection) {
        return connection != null ? connection.getSubject() : null;
    }

    protected static KeycloakSecurityContext getSecurityContextFromSubject(Subject subject) {
        return KeycloakIdentityProvider.getSecurityContext((Subject)subject);
    }

    protected static Optional<AuthContext> getAuthContextFromConnection(RemotingConnection connection) {
        return Optional.ofNullable(MQTTHandler.getSubjectFromConnection(connection)).map(MQTTHandler::getSecurityContextFromSubject).map(MQTTHandler::getAuthContextFromSecurityContext);
    }

    protected static AuthContext getAuthContextFromSecurityContext(KeycloakSecurityContext securityContext) {
        return securityContext == null ? null : new AccessTokenAuthContext(securityContext.getRealm(), securityContext.getToken());
    }
}

