/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.mqtt;

import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.security.cert.CertificateExpiredException;
import java.security.cert.CertificateNotYetValidException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.security.auth.x500.X500Principal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.persistence.PersistenceService;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.mqtt.MQTTBrokerService;
import org.openremote.manager.mqtt.MQTTHandler;
import org.openremote.manager.mqtt.Topic;
import org.openremote.manager.provisioning.ProvisioningService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.security.ManagerKeycloakIdentityProvider;
import org.openremote.model.Container;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.Asset;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.provisioning.ErrorResponseMessage;
import org.openremote.model.provisioning.ProvisioningConfig;
import org.openremote.model.provisioning.ProvisioningMessage;
import org.openremote.model.provisioning.ProvisioningUtil;
import org.openremote.model.provisioning.SuccessResponseMessage;
import org.openremote.model.provisioning.X509ProvisioningConfig;
import org.openremote.model.provisioning.X509ProvisioningMessage;
import org.openremote.model.security.ClientRole;
import org.openremote.model.security.User;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.TextUtil;
import org.openremote.model.util.UniqueIdentifierGenerator;
import org.openremote.model.util.ValueUtil;

public class UserAssetProvisioningMQTTHandler
extends MQTTHandler {
    protected static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.API, UserAssetProvisioningMQTTHandler.class);
    public static final String PROVISIONING_TOKEN = "provisioning";
    public static final String REQUEST_TOKEN = "request";
    public static final String RESPONSE_TOKEN = "response";
    public static final String UNIQUE_ID_PLACEHOLDER = "%UNIQUE_ID%";
    public static final String PROVISIONING_USER_PREFIX = "ps-";
    protected ProvisioningService provisioningService;
    protected TimerService timerService;
    protected AssetStorageService assetStorageService;
    protected ManagerKeycloakIdentityProvider identityProvider;
    protected boolean isKeycloak;
    protected final ConcurrentMap<Long, Set<RemotingConnection>> provisioningConfigAuthenticatedConnectionMap = new ConcurrentHashMap<Long, Set<RemotingConnection>>();
    protected Timer provisioningTimer;
    protected final Map<String, RemotingConnection> responseSubscribedConnections = new ConcurrentHashMap<String, RemotingConnection>();

    @Override
    public void init(Container container, Configuration serverConfiguration) throws Exception {
        super.init(container, serverConfiguration);
        if (container.getMeterRegistry() != null) {
            this.provisioningTimer = container.getMeterRegistry().timer("or.provisioning", (Iterable)Tags.empty());
        }
    }

    @Override
    public void start(Container container) throws Exception {
        super.start(container);
        this.provisioningService = (ProvisioningService)container.getService(ProvisioningService.class);
        this.timerService = (TimerService)container.getService(TimerService.class);
        this.assetStorageService = (AssetStorageService)container.getService(AssetStorageService.class);
        ManagerIdentityService identityService = (ManagerIdentityService)container.getService(ManagerIdentityService.class);
        if (!identityService.isKeycloakEnabled()) {
            LOG.warning("MQTT connections are not supported when not using Keycloak identity provider");
            this.isKeycloak = false;
        } else {
            this.isKeycloak = true;
            this.identityProvider = (ManagerKeycloakIdentityProvider)identityService.getIdentityProvider();
            ((MessageBrokerService)container.getService(MessageBrokerService.class)).getContext().addRoutes((RoutesBuilder)new ProvisioningPersistenceRouteBuilder(this));
        }
    }

    @Override
    protected AddressSettings getPublishTopicAddressSettings(Container container, String publishTopic) {
        AddressSettings addressSettings = super.getPublishTopicAddressSettings(container, publishTopic);
        if (addressSettings != null) {
            addressSettings.setMaxSizeMessages(1000L);
        }
        return addressSettings;
    }

    @Override
    public boolean handlesTopic(Topic topic) {
        return this.topicMatches(topic);
    }

    @Override
    public boolean checkCanSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        if (!this.canSubscribe(connection, securityContext, topic)) {
            this.getLogger().fine("Cannot subscribe to this topic, topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        return true;
    }

    @Override
    public boolean checkCanPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        if (!this.canPublish(connection, securityContext, topic)) {
            this.getLogger().fine("Cannot publish to this topic, topic=" + String.valueOf(topic) + ", " + MQTTBrokerService.connectionToString(connection));
            return false;
        }
        return true;
    }

    @Override
    public boolean topicMatches(Topic topic) {
        return UserAssetProvisioningMQTTHandler.isProvisioningTopic(topic) && topic.getTokens().size() == 3 && (UserAssetProvisioningMQTTHandler.isRequestTopic(topic) || UserAssetProvisioningMQTTHandler.isResponseTopic(topic));
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    @Override
    public boolean canSubscribe(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        RemotingConnection existingConnection;
        boolean allowed;
        if (!this.isKeycloak) {
            LOG.fine("Identity provider is not keycloak");
            return false;
        }
        boolean bl = allowed = UserAssetProvisioningMQTTHandler.isResponseTopic(topic) && !"#".equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1)) && !"+".equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1));
        if (allowed && (existingConnection = this.responseSubscribedConnections.get(topic.getString())) != null) {
            LOG.warning("Subscription already exists possible eavesdropping");
            allowed = false;
        }
        return allowed;
    }

    @Override
    public void onSubscribe(RemotingConnection connection, Topic topic) {
        this.responseSubscribedConnections.put(topic.getString(), connection);
    }

    @Override
    public void onUnsubscribe(RemotingConnection connection, Topic topic) {
        this.responseSubscribedConnections.remove(topic.getString());
    }

    @Override
    public Set<String> getPublishListenerTopics() {
        return Set.of("provisioning/+/request");
    }

    @Override
    public boolean canPublish(RemotingConnection connection, KeycloakSecurityContext securityContext, Topic topic) {
        if (!this.isKeycloak) {
            LOG.fine("Identity provider is not keycloak");
            return false;
        }
        return UserAssetProvisioningMQTTHandler.isRequestTopic(topic) && !"#".equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1)) && !"+".equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1));
    }

    @Override
    public void onPublish(RemotingConnection connection, Topic topic, ByteBuf body) {
        if (!connection.getTransportConnection().isOpen()) {
            LOG.finest(() -> "Skipping provisioning request as connection is now closed: " + MQTTBrokerService.connectionToString(connection));
            return;
        }
        this.executorService.submit(() -> {
            if (this.provisioningTimer != null) {
                this.provisioningTimer.record(() -> this.processProvisioningRequest(connection, topic, body));
            } else {
                this.processProvisioningRequest(connection, topic, body);
            }
        });
    }

    @Override
    public void onConnectionLost(RemotingConnection connection) {
        this.provisioningConfigAuthenticatedConnectionMap.values().forEach(connections -> connections.remove(connection));
    }

    @Override
    public void onDisconnect(RemotingConnection connection) {
        this.provisioningConfigAuthenticatedConnectionMap.values().forEach(connections -> connections.remove(connection));
    }

    protected void processProvisioningRequest(RemotingConnection connection, Topic topic, ByteBuf body) {
        String payloadContent = body.toString(StandardCharsets.UTF_8);
        ProvisioningMessage provisioningMessage = ValueUtil.parse((String)payloadContent, ProvisioningMessage.class).orElseGet(() -> {
            LOG.info("Failed to parse provisioning request message from client: " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.MESSAGE_INVALID), MqttQoS.AT_MOST_ONCE);
            return null;
        });
        if (provisioningMessage == null) {
            return;
        }
        if (provisioningMessage instanceof X509ProvisioningMessage) {
            this.processX509ProvisioningMessage(connection, topic, (X509ProvisioningMessage)provisioningMessage);
        }
    }

    protected static boolean isProvisioningTopic(Topic topic) {
        return PROVISIONING_TOKEN.equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 0));
    }

    protected static boolean isRequestTopic(Topic topic) {
        return REQUEST_TOKEN.equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected static boolean isResponseTopic(Topic topic) {
        return RESPONSE_TOKEN.equals(UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 2));
    }

    protected String getResponseTopic(Topic topic) {
        return "provisioning/" + UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1) + "/response";
    }

    protected void processX509ProvisioningMessage(RemotingConnection connection, Topic topic, X509ProvisioningMessage provisioningMessage) {
        Asset<?> asset;
        User serviceUser;
        X509Certificate clientCertificate;
        LOG.fine(() -> "Processing X.509 provisioning message: " + MQTTBrokerService.connectionToString(connection));
        if (TextUtil.isNullOrEmpty((String)provisioningMessage.getCert())) {
            LOG.info("Certificate is missing from X509 provisioning message" + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CERTIFICATE_INVALID), MqttQoS.AT_MOST_ONCE);
            return;
        }
        try {
            clientCertificate = ProvisioningUtil.getX509Certificate((String)provisioningMessage.getCert());
        }
        catch (CertificateException e) {
            LOG.log(Level.INFO, "Failed to parse X.509 certificate: " + MQTTBrokerService.connectionToString(connection), e);
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CERTIFICATE_INVALID), MqttQoS.AT_MOST_ONCE);
            return;
        }
        X509ProvisioningConfig matchingConfig = this.getMatchingX509ProvisioningConfig(connection, clientCertificate);
        if (matchingConfig == null) {
            LOG.fine("No matching provisioning config found for X.509 certificate: " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNAUTHORIZED), MqttQoS.AT_MOST_ONCE);
            return;
        }
        if (matchingConfig.isDisabled()) {
            LOG.fine("Matching provisioning config is disabled for X.509 certificate: " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.CONFIG_DISABLED), MqttQoS.AT_MOST_ONCE);
            return;
        }
        String certUniqueId = ProvisioningUtil.getSubjectCN((X500Principal)clientCertificate.getSubjectX500Principal());
        String uniqueId = UserAssetProvisioningMQTTHandler.topicTokenIndexToString(topic, 1);
        if (TextUtil.isNullOrEmpty((String)certUniqueId)) {
            LOG.info(() -> "X.509 certificate missing unique ID in subject CN: " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNIQUE_ID_MISMATCH), MqttQoS.AT_MOST_ONCE);
            return;
        }
        if (TextUtil.isNullOrEmpty((String)uniqueId) || !certUniqueId.equals(uniqueId)) {
            LOG.info(() -> "X.509 certificate unique ID doesn't match topic unique ID: " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.UNIQUE_ID_MISMATCH), MqttQoS.AT_MOST_ONCE);
            return;
        }
        String realm = matchingConfig.getRealm();
        try {
            LOG.finest("Checking service user: " + uniqueId);
            serviceUser = UserAssetProvisioningMQTTHandler.getCreateClientServiceUser(realm, this.identityProvider, uniqueId, matchingConfig);
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to retrieve/create service user: " + MQTTBrokerService.connectionToString(connection), e);
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.SERVER_ERROR), MqttQoS.AT_MOST_ONCE);
            return;
        }
        if (!serviceUser.getEnabled().booleanValue()) {
            LOG.info(() -> "Service user exists and has been disabled so cannot continue:  " + MQTTBrokerService.connectionToString(connection));
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.USER_DISABLED), MqttQoS.AT_MOST_ONCE);
            return;
        }
        LOG.finest("Service user exists and is enabled");
        try {
            LOG.finest(() -> "Checking provisioned asset: " + uniqueId);
            asset = UserAssetProvisioningMQTTHandler.getCreateClientAsset(this.assetStorageService, realm, uniqueId, serviceUser, matchingConfig);
            if (asset != null && !matchingConfig.getRealm().equals(asset.getRealm())) {
                LOG.warning("Client asset realm mismatch");
                this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.ASSET_ERROR), MqttQoS.AT_MOST_ONCE);
                return;
            }
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to retrieve/create asset: " + MQTTBrokerService.connectionToString(connection) + ", config=" + String.valueOf(matchingConfig), e);
            this.publishMessage(this.getResponseTopic(topic), new ErrorResponseMessage(ErrorResponseMessage.Error.SERVER_ERROR), MqttQoS.AT_MOST_ONCE);
            return;
        }
        this.mqttBrokerService.authenticateConnection(connection, realm, serviceUser.getUsername(), serviceUser.getSecret());
        this.provisioningConfigAuthenticatedConnectionMap.compute(matchingConfig.getId(), (id, connections) -> {
            if (connections == null) {
                connections = ConcurrentHashMap.newKeySet();
            }
            connections.add(connection);
            return connections;
        });
        LOG.fine("Client successfully provisioned: " + uniqueId);
        this.publishMessage(this.getResponseTopic(topic), new SuccessResponseMessage(realm, asset), MqttQoS.AT_MOST_ONCE);
    }

    protected X509ProvisioningConfig getMatchingX509ProvisioningConfig(RemotingConnection connection, X509Certificate clientCertificate) {
        return this.provisioningService.getProvisioningConfigs().stream().filter(config -> config instanceof X509ProvisioningConfig).map(config -> (X509ProvisioningConfig)config).filter(config -> {
            block6: {
                try {
                    X509Certificate caCertificate = config.getCertificate();
                    if (caCertificate == null || !caCertificate.getSubjectX500Principal().getName().equals(clientCertificate.getIssuerX500Principal().getName())) break block6;
                    LOG.finest(() -> "Client certificate issuer matches provisioning config CA certificate subject: " + MQTTBrokerService.connectionToString(connection) + ", config=" + String.valueOf(config));
                    Date now = Date.from(this.timerService.getNow());
                    try {
                        clientCertificate.verify(caCertificate.getPublicKey());
                        LOG.finest(() -> "Client certificate verified against CA certificate: " + MQTTBrokerService.connectionToString(connection) + ", config=" + String.valueOf(config));
                        if (!config.getData().isIgnoreExpiryDate()) {
                            LOG.finest(() -> "Validating client certificate validity: " + MQTTBrokerService.connectionToString(connection) + ", timestamp=" + String.valueOf(now));
                            clientCertificate.checkValidity(now);
                        }
                        return true;
                    }
                    catch (CertificateExpiredException | CertificateNotYetValidException e) {
                        LOG.log(Level.INFO, "Client certificate failed validity check: " + MQTTBrokerService.connectionToString(connection) + ", timestamp=" + String.valueOf(now), e);
                    }
                    catch (Exception e) {
                        LOG.log(Level.INFO, "Client certificate failed verification against CA certificate: " + MQTTBrokerService.connectionToString(connection) + ", config=" + String.valueOf(config), e);
                    }
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Failed to extract certificate from provisioning config: " + MQTTBrokerService.connectionToString(connection) + ", config=" + String.valueOf(config), e);
                }
            }
            return false;
        }).findFirst().orElse(null);
    }

    public static User getCreateClientServiceUser(String realm, ManagerKeycloakIdentityProvider identityProvider, String uniqueId, ProvisioningConfig<?, ?> provisioningConfig) throws RuntimeException {
        String username = PROVISIONING_USER_PREFIX + uniqueId;
        User serviceUser = identityProvider.getUserByUsername(realm, "service-account-" + username);
        if (serviceUser != null) {
            LOG.fine("Service user found: realm=" + realm + ", username=" + username);
            return serviceUser;
        }
        LOG.finest("Creating service user: realm=" + realm + ", username=" + username);
        serviceUser = new User().setServiceAccount(true).setEnabled(Boolean.valueOf(true)).setUsername(username);
        String secret = UniqueIdentifierGenerator.generateId();
        serviceUser = identityProvider.createUpdateUser(realm, serviceUser, secret, true);
        if (provisioningConfig.getUserRoles() != null && provisioningConfig.getUserRoles().length > 0) {
            LOG.finest("Setting user roles: realm=" + realm + ", username=" + username + ", roles=" + Arrays.toString(provisioningConfig.getUserRoles()));
            identityProvider.updateUserClientRoles(realm, serviceUser.getId(), "openremote", (String[])Arrays.stream(provisioningConfig.getUserRoles()).map(ClientRole::getValue).toArray(String[]::new));
        } else {
            LOG.finest("No user roles defined: realm=" + realm + ", username=" + username);
        }
        if (provisioningConfig.isRestrictedUser()) {
            LOG.finest("User will be made restricted: realm=" + realm + ", username=" + username);
            identityProvider.updateUserRealmRoles(realm, serviceUser.getId(), identityProvider.addUserRealmRoles(realm, serviceUser.getId(), new String[]{"restricted_user"}));
        }
        serviceUser.setSecret(secret);
        return serviceUser;
    }

    public static Asset<?> getCreateClientAsset(AssetStorageService assetStorageService, String realm, String uniqueId, User serviceUser, ProvisioningConfig<?, ?> provisioningConfig) throws RuntimeException {
        String assetId = UniqueIdentifierGenerator.generateId((String)(realm + uniqueId));
        Asset asset = assetStorageService.find(assetId);
        if (asset != null) {
            LOG.finest("Asset exists");
            return asset;
        }
        LOG.finest("Creating client asset: realm=" + realm + ", username=" + serviceUser.getUsername());
        if (TextUtil.isNullOrEmpty((String)provisioningConfig.getAssetTemplate())) {
            LOG.finest("Provisioning config doesn't contain an asset template: " + String.valueOf(provisioningConfig));
            return null;
        }
        String assetTemplate = provisioningConfig.getAssetTemplate();
        assetTemplate = assetTemplate.replaceAll(UNIQUE_ID_PLACEHOLDER, uniqueId);
        asset = (Asset)ValueUtil.parse((String)assetTemplate, Asset.class).orElseThrow(() -> new RuntimeException("Failed to de-serialise asset template into an asset instance: " + String.valueOf(provisioningConfig)));
        asset.setId(assetId);
        asset.setRealm(realm);
        asset = assetStorageService.merge(asset);
        if (provisioningConfig.isRestrictedUser()) {
            assetStorageService.storeUserAssetLinks(Collections.singletonList(new UserAssetLink(realm, serviceUser.getId(), assetId)));
        }
        return asset;
    }

    protected void forceClientDisconnects(long provisioningConfigId) {
        this.provisioningConfigAuthenticatedConnectionMap.computeIfPresent(provisioningConfigId, (id, connections) -> {
            connections.forEach(connection -> {
                try {
                    LOG.fine("Force disconnecting client that is using provisioning config ID '" + provisioningConfigId + "': " + MQTTBrokerService.connectionToString(connection));
                    connection.disconnect(false);
                }
                catch (Exception e) {
                    this.getLogger().log(Level.WARNING, "Failed to disconnect client: " + MQTTBrokerService.connectionToString(connection), e);
                }
            });
            connections.clear();
            return connections;
        });
    }

    protected static class ProvisioningPersistenceRouteBuilder
    extends RouteBuilder {
        UserAssetProvisioningMQTTHandler mqttHandler;

        public ProvisioningPersistenceRouteBuilder(UserAssetProvisioningMQTTHandler mqttHandler) {
            this.mqttHandler = mqttHandler;
        }

        public void configure() throws Exception {
            this.from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-ProvisioningConfig").filter(PersistenceService.isPersistenceEventForEntityType(ProvisioningConfig.class)).process(exchange -> {
                boolean forceDisconnect;
                PersistenceEvent persistenceEvent = (PersistenceEvent)exchange.getIn().getBody(PersistenceEvent.class);
                boolean bl = forceDisconnect = persistenceEvent.getCause() == PersistenceEvent.Cause.DELETE;
                if (persistenceEvent.getCause() == PersistenceEvent.Cause.UPDATE) {
                    boolean bl2 = forceDisconnect = persistenceEvent.hasPropertyChanged("disabled") || persistenceEvent.hasPropertyChanged("data");
                }
                if (forceDisconnect) {
                    LOG.fine("Provisioning config modified or deleted so forcing connected clients to disconnect: " + String.valueOf(persistenceEvent.getEntity()));
                    this.mqttHandler.forceClientDisconnects(((ProvisioningConfig)persistenceEvent.getEntity()).getId());
                }
            });
        }
    }
}

