/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.mqtt;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.channel.ChannelId;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.security.auth.Subject;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.GuestLoginModule;
import org.apache.activemq.artemis.spi.core.security.jaas.PrincipalConversionLoginModule;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.http.client.utils.URIBuilder;
import org.keycloak.KeycloakPrincipal;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.security.keycloak.KeycloakIdentityProvider;
import org.openremote.container.timer.TimerService;
import org.openremote.container.util.MapAccess;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.asset.AssetStorageService;
import org.openremote.manager.event.ClientEventService;
import org.openremote.manager.mqtt.ActiveMQORSecurityManager;
import org.openremote.manager.mqtt.MQTTHandler;
import org.openremote.manager.mqtt.Topic;
import org.openremote.manager.security.AuthorisationService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.manager.security.ManagerKeycloakIdentityProvider;
import org.openremote.manager.security.MultiTenantClientCredentialsGrantsLoginModule;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.security.User;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.Debouncer;
import org.openremote.model.util.TextUtil;
import org.openremote.model.util.UniqueIdentifierGenerator;

public class MQTTBrokerService
extends RouteBuilder
implements ContainerService,
ActiveMQServerConnectionPlugin,
ActiveMQServerSessionPlugin {
    public static final String MQTT_FORCE_USER_DISCONNECT_DEBOUNCE_MILLIS = "MQTT_FORCE_USER_DISCONNECT_DEBOUNCE_MILLIS";
    public static int MQTT_FORCE_USER_DISCONNECT_DEBOUNCE_MILLIS_DEFAULT = 5000;
    public static final int PRIORITY = 0;
    public static final String MQTT_SERVER_LISTEN_HOST = "MQTT_SERVER_LISTEN_HOST";
    public static final String MQTT_SERVER_LISTEN_PORT = "MQTT_SERVER_LISTEN_PORT";
    public static final String ANONYMOUS_USERNAME = "anonymous";
    protected final WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
    protected static final System.Logger LOG = System.getLogger(MQTTBrokerService.class.getName() + "." + SyslogCategory.API.name());
    protected AssetStorageService assetStorageService;
    protected AuthorisationService authorisationService;
    protected ManagerKeycloakIdentityProvider identityProvider;
    protected ClientEventService clientEventService;
    protected MessageBrokerService messageBrokerService;
    protected ExecutorService executorService;
    protected TimerService timerService;
    protected AssetProcessingService assetProcessingService;
    protected List<MQTTHandler> customHandlers = new ArrayList<MQTTHandler>();
    protected ConcurrentMap<String, RemotingConnection> clientIDConnectionMap = new ConcurrentHashMap<String, RemotingConnection>();
    protected ConcurrentMap<String, RemotingConnection> connectionIDConnectionMap = new ConcurrentHashMap<String, RemotingConnection>();
    protected ConcurrentMap<String, List<PersistenceEvent<UserAssetLink>>> userAssetLinkChangeMap = new ConcurrentHashMap<String, List<PersistenceEvent<UserAssetLink>>>();
    protected Debouncer<String> userAssetDisconnectDebouncer;
    protected Cache<String, RemotingConnection> disconnectedConnectionCache;
    protected boolean active;
    protected String host;
    protected int port;
    protected Configuration serverConfiguration;
    protected EmbeddedActiveMQ server;
    protected ActiveMQORSecurityManager securityManager;
    protected ServerLocator serverLocator;
    protected ClientSessionFactory sessionFactory;

    public int getPriority() {
        return 0;
    }

    public void init(final Container container) throws Exception {
        this.host = MapAccess.getString((Map)container.getConfig(), (String)MQTT_SERVER_LISTEN_HOST, (String)"0.0.0.0");
        this.port = MapAccess.getInteger((Map)container.getConfig(), (String)MQTT_SERVER_LISTEN_PORT, (int)1883);
        int debounceMillis = MapAccess.getInteger((Map)container.getConfig(), (String)MQTT_FORCE_USER_DISCONNECT_DEBOUNCE_MILLIS, (int)MQTT_FORCE_USER_DISCONNECT_DEBOUNCE_MILLIS_DEFAULT);
        this.assetStorageService = (AssetStorageService)container.getService(AssetStorageService.class);
        this.authorisationService = (AuthorisationService)container.getService(AuthorisationService.class);
        this.clientEventService = (ClientEventService)container.getService(ClientEventService.class);
        ManagerIdentityService identityService = (ManagerIdentityService)container.getService(ManagerIdentityService.class);
        this.messageBrokerService = (MessageBrokerService)container.getService(MessageBrokerService.class);
        this.executorService = container.getExecutor();
        this.timerService = (TimerService)container.getService(TimerService.class);
        this.assetProcessingService = (AssetProcessingService)container.getService(AssetProcessingService.class);
        this.userAssetDisconnectDebouncer = new Debouncer(container.getScheduledExecutor(), id -> this.processUserAssetLinkChange((String)id, (List)this.userAssetLinkChangeMap.remove(id)), debounceMillis);
        this.disconnectedConnectionCache = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterWrite(3000L, TimeUnit.MILLISECONDS).build();
        if (!identityService.isKeycloakEnabled()) {
            LOG.log(System.Logger.Level.WARNING, "MQTT connections are not supported when not using Keycloak identity provider");
            this.active = false;
        } else {
            this.active = true;
            this.identityProvider = (ManagerKeycloakIdentityProvider)identityService.getIdentityProvider();
            ((MessageBrokerService)container.getService(MessageBrokerService.class)).getContext().addRoutes((RoutesBuilder)this);
        }
        this.serverConfiguration = new ConfigurationImpl();
        this.serverConfiguration.addAcceptorConfiguration("in-vm", "vm://0?protocols=core");
        String serverURI = new URIBuilder().setScheme("tcp").setHost(this.host).setPort(this.port).setParameter("protocols", "MQTT").setParameter("allowLinkStealing", "true").setParameter("defaultMqttSessionExpiryInterval", "0").build().toString();
        this.serverConfiguration.addAcceptorConfiguration("tcp", serverURI);
        this.serverConfiguration.registerBrokerPlugin((ActiveMQServerBasePlugin)this);
        if (container.getMeterRegistry() != null) {
            this.serverConfiguration.setMetricsConfiguration(new MetricsConfiguration().setJvmMemory(false).setPlugin((ActiveMQMetricsPlugin)new SimpleMetricsPlugin(){

                public MeterRegistry getRegistry() {
                    return container.getMeterRegistry();
                }
            }));
        }
        this.serverConfiguration.setWildCardConfiguration(this.wildcardConfiguration);
        this.serverConfiguration.setLiteralMatchMarkers("()");
        this.serverConfiguration.addAddressSetting(this.wildcardConfiguration.getAnyWordsString(), new AddressSettings().setDeadLetterAddress(SimpleString.of((String)"ActiveMQ.DLQ")).setExpiryAddress(SimpleString.of((String)"ActiveMQ.expired")).setAutoDeleteCreatedQueues(Boolean.valueOf(true)).setAutoDeleteAddresses(Boolean.valueOf(true)).setAutoDeleteAddressesSkipUsageCheck(true).setAutoDeleteAddressesDelay(86400000L).setAutoDeleteQueuesMessageCount(-1L).setAutoDeleteQueuesDelay(0L).setDefaultConsumerWindowSize(-1).setPageLimitMessages(Long.valueOf(0L)).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL).setPageFullMessagePolicy(PageFullMessagePolicy.FAIL).setEnableMetrics(false));
        this.serverConfiguration.setPersistenceEnabled(false);
        this.serverConfiguration.setAuthenticationCacheSize(0L);
        this.serverConfiguration.setAuthorizationCacheSize(0L);
        this.customHandlers = StreamSupport.stream(ServiceLoader.load(MQTTHandler.class).spliterator(), false).sorted(Comparator.comparingInt(MQTTHandler::getPriority)).collect(Collectors.toList());
        for (MQTTHandler handler : this.customHandlers) {
            try {
                handler.init(container, this.serverConfiguration);
            }
            catch (Exception e) {
                LOG.log(System.Logger.Level.WARNING, "MQTT custom handler threw an exception whilst initialising: handler=" + handler.getName(), (Throwable)e);
                throw e;
            }
        }
    }

    public void start(Container container) throws Exception {
        if (!this.active) {
            return;
        }
        this.server = new EmbeddedActiveMQ();
        this.server.setConfiguration(this.serverConfiguration);
        this.securityManager = new ActiveMQORSecurityManager(this.authorisationService, this, realm -> this.identityProvider.getKeycloakDeployment((String)realm, "openremote"), "", new SecurityConfiguration(this){

            public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
                return new AppConfigurationEntry[]{new AppConfigurationEntry(GuestLoginModule.class.getName(), AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT, Map.of("debug", "true", "credentialsInvalidate", "true", "org.apache.activemq.jaas.guest.user", MQTTBrokerService.ANONYMOUS_USERNAME, "org.apache.activemq.jaas.guest.role", MQTTBrokerService.ANONYMOUS_USERNAME)), new AppConfigurationEntry(MultiTenantClientCredentialsGrantsLoginModule.class.getName(), AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, Map.of("includeRealmRoles", "true", "role-principal-class", RolePrincipal.class.getName())), new AppConfigurationEntry(PrincipalConversionLoginModule.class.getName(), AppConfigurationEntry.LoginModuleControlFlag.REQUISITE, Map.of("principalClassList", KeycloakPrincipal.class.getName()))};
            }
        });
        this.server.setSecurityManager((ActiveMQSecurityManager)this.securityManager);
        this.server.start();
        LOG.log(System.Logger.Level.DEBUG, "Started MQTT broker");
        this.server.getActiveMQServer().getManagementService().addNotificationListener(notification -> {
            if (notification.getType() == CoreNotificationType.CONSUMER_CREATED || notification.getType() == CoreNotificationType.CONSUMER_CLOSED) {
                boolean isSubscribe = notification.getType() == CoreNotificationType.CONSUMER_CREATED;
                String sessionId = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME).toString();
                String topic = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_ADDRESS).toString();
                ServerSession session = this.server.getActiveMQServer().getSessionByID(sessionId);
                boolean isInternal = session.getRemotingConnection().getTransportConnection() instanceof InVMConnection;
                if (isInternal) {
                    return;
                }
                if (isSubscribe) {
                    this.onSubscribe(session.getRemotingConnection(), MQTTUtil.getMqttTopicFromCoreAddress((String)topic, (WildcardConfiguration)this.wildcardConfiguration));
                } else {
                    this.onUnsubscribe(session.getRemotingConnection(), MQTTUtil.getMqttTopicFromCoreAddress((String)topic, (WildcardConfiguration)this.wildcardConfiguration));
                }
            }
        });
        this.serverLocator = ActiveMQClient.createServerLocator((String)"vm://0").setProducerWindowSize(-1);
        this.sessionFactory = this.serverLocator.createSessionFactory();
        for (MQTTHandler handler : this.customHandlers) {
            try {
                handler.start(container);
            }
            catch (Exception e) {
                LOG.log(System.Logger.Level.WARNING, "MQTT custom handler threw an exception whilst starting: handler=" + handler.getName(), (Throwable)e);
                throw e;
            }
        }
    }

    public void configure() throws Exception {
        this.from("seda://PersistenceTopic?multipleConsumers=true&concurrentConsumers=1&waitForTaskToComplete=NEVER&purgeWhenStopping=true&discardIfNoConsumers=true&size=25000").routeId("Persistence-UserAndAssetLink").filter(this.body().isInstanceOf(PersistenceEvent.class)).process(exchange -> {
            PersistenceEvent persistenceEvent = (PersistenceEvent)exchange.getIn().getBody(PersistenceEvent.class);
            Object patt0$temp = persistenceEvent.getEntity();
            if (patt0$temp instanceof User) {
                boolean forceDisconnect;
                User user = (User)patt0$temp;
                if (!user.isServiceAccount()) {
                    return;
                }
                boolean bl = forceDisconnect = persistenceEvent.getCause() == PersistenceEvent.Cause.DELETE;
                if (persistenceEvent.getCause() == PersistenceEvent.Cause.UPDATE) {
                    boolean bl2 = forceDisconnect = persistenceEvent.hasPropertyChanged("enabled") || persistenceEvent.hasPropertyChanged("username") || persistenceEvent.hasPropertyChanged("secret");
                }
                if (forceDisconnect) {
                    LOG.log(System.Logger.Level.TRACE, "User modified or deleted so force closing any sessions for this user: " + String.valueOf(user));
                    this.getUserConnections(user.getId()).forEach(this::doForceDisconnect);
                }
            } else {
                Object patt1$temp = persistenceEvent.getEntity();
                if (patt1$temp instanceof UserAssetLink) {
                    UserAssetLink userAssetLink = (UserAssetLink)patt1$temp;
                    String userID = userAssetLink.getId().getUserId();
                    List changedUserAssetLinks = this.userAssetLinkChangeMap.computeIfAbsent(userID, id -> Collections.synchronizedList(new ArrayList()));
                    changedUserAssetLinks.add(persistenceEvent);
                    this.userAssetDisconnectDebouncer.call((Object)userID);
                }
            }
        });
    }

    public void stop(Container container) throws Exception {
        this.userAssetDisconnectDebouncer.cancelAll(true);
        this.server.stop();
        LOG.log(System.Logger.Level.DEBUG, "Stopped MQTT broker");
        StreamSupport.stream(ServiceLoader.load(MQTTHandler.class).spliterator(), false).sorted(Comparator.comparingInt(MQTTHandler::getPriority).reversed()).forEach(handler -> {
            try {
                handler.stop();
            }
            catch (Exception e) {
                LOG.log(System.Logger.Level.WARNING, "MQTT custom handler threw an exception whilst stopping: handler=" + handler.getName(), (Throwable)e);
            }
        });
    }

    public void afterCreateConnection(final RemotingConnection connection) throws ActiveMQException {
        connection.addFailureListener(new FailureListener(){

            public void connectionFailed(ActiveMQException exception, boolean failedOver) {
                this.connectionFailed(exception, failedOver, null);
            }

            public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
                RemotingConnection remotingConnection;
                MQTTBrokerService.this.connectionIDConnectionMap.remove(MQTTBrokerService.getConnectionIDString(connection));
                if (connection.getClientID() != null && (remotingConnection = (RemotingConnection)MQTTBrokerService.this.clientIDConnectionMap.remove(connection.getClientID())) != null) {
                    MQTTBrokerService.this.disconnectedConnectionCache.put((Object)connection.getClientID(), (Object)remotingConnection);
                }
                if (exception.getType() == ActiveMQExceptionType.REMOTE_DISCONNECT) {
                    LOG.log(System.Logger.Level.DEBUG, () -> "Client disconnected: " + MQTTBrokerService.connectionToString(connection));
                    for (MQTTHandler handler : MQTTBrokerService.this.getCustomHandlers()) {
                        handler.onDisconnect(connection);
                    }
                } else {
                    LOG.log(System.Logger.Level.DEBUG, () -> "Client disconnected (failure=" + exception.getMessage() + "): " + MQTTBrokerService.connectionToString(connection));
                    for (MQTTHandler handler : MQTTBrokerService.this.getCustomHandlers()) {
                        handler.onConnectionLost(connection);
                    }
                }
            }
        });
    }

    public void afterCreateSession(ServerSession session) throws ActiveMQException {
        RemotingConnection remotingConnection = session.getRemotingConnection();
        if (remotingConnection == null || remotingConnection.getClientID() == null || remotingConnection.getTransportConnection() instanceof InVMConnection) {
            return;
        }
        String connectionID = MQTTBrokerService.getConnectionIDString(remotingConnection);
        this.clientIDConnectionMap.put(remotingConnection.getClientID(), remotingConnection);
        if (!this.connectionIDConnectionMap.containsKey(connectionID)) {
            LOG.log(System.Logger.Level.DEBUG, () -> "Client connected: " + MQTTBrokerService.connectionToString(remotingConnection));
            this.connectionIDConnectionMap.put(connectionID, remotingConnection);
            for (MQTTHandler handler : this.getCustomHandlers()) {
                handler.onConnect(remotingConnection);
            }
        }
    }

    public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
    }

    public void onSubscribe(RemotingConnection connection, String topicStr) {
        Topic topic = Topic.parse(topicStr);
        LOG.log(System.Logger.Level.TRACE, () -> "onSubscribe '" + topicStr + "': " + MQTTBrokerService.connectionToString(connection));
        for (MQTTHandler handler : this.getCustomHandlers()) {
            if (!handler.handlesTopic(topic)) continue;
            LOG.log(System.Logger.Level.DEBUG, () -> "Client subscribed '" + topicStr + "': " + MQTTBrokerService.connectionToString(connection));
            handler.onSubscribe(connection, topic);
            break;
        }
    }

    public void onUnsubscribe(RemotingConnection connection, String topicStr) {
        Topic topic = Topic.parse(topicStr);
        LOG.log(System.Logger.Level.TRACE, () -> "onUnsubscribe '" + topicStr + "': " + MQTTBrokerService.connectionToString(connection));
        for (MQTTHandler handler : this.getCustomHandlers()) {
            if (!handler.handlesTopic(topic)) continue;
            LOG.log(System.Logger.Level.DEBUG, () -> "Client unsubscribed '" + topicStr + "': " + MQTTBrokerService.connectionToString(connection));
            handler.onUnsubscribe(connection, topic);
            break;
        }
    }

    public Iterable<MQTTHandler> getCustomHandlers() {
        return this.customHandlers;
    }

    public void processUserAssetLinkChange(String userID, List<PersistenceEvent<UserAssetLink>> changes) {
        if (TextUtil.isNullOrEmpty((String)userID)) {
            return;
        }
        Set<RemotingConnection> userConnections = this.getUserConnections(userID);
        Subject subject = userConnections.stream().filter(connection -> connection.getSubject() != null).findFirst().map(RemotingConnection::getSubject).orElse(null);
        if (subject != null && KeycloakIdentityProvider.getSecurityContext((Subject)subject).getToken().getRealmAccess().isUserInRole("restricted_user")) {
            LOG.log(System.Logger.Level.TRACE, "User asset links modified for connected restricted user so passing to handlers to decide what to do: user=" + String.valueOf(subject));
            userConnections.forEach(connection -> {
                for (MQTTHandler handler : this.customHandlers) {
                    connection.setSubject(subject);
                    handler.onUserAssetLinksChanged((RemotingConnection)connection, changes);
                }
            });
        }
    }

    public Set<RemotingConnection> getUserConnections(String userID) {
        if (TextUtil.isNullOrEmpty((String)userID)) {
            return Collections.emptySet();
        }
        return this.server.getActiveMQServer().getRemotingService().getConnections().stream().filter(connection -> {
            Subject subject = connection.getSubject();
            String subjectID = KeycloakIdentityProvider.getSubjectId((Subject)subject);
            return userID.equals(subjectID);
        }).collect(Collectors.toSet());
    }

    protected void doForceDisconnect(RemotingConnection connection) {
        LOG.log(System.Logger.Level.DEBUG, "Force disconnecting client connection: " + MQTTBrokerService.connectionToString(connection));
        connection.disconnect(false);
        ((SecurityStoreImpl)this.server.getActiveMQServer().getSecurityStore()).invalidateAuthorizationCache();
    }

    public boolean disconnectSession(String connectionID) {
        RemotingConnection connection = (RemotingConnection)this.connectionIDConnectionMap.get(connectionID);
        if (connection != null) {
            LOG.log(System.Logger.Level.DEBUG, "Force disconnecting client connection: " + MQTTBrokerService.connectionToString(connection));
            this.doForceDisconnect(connection);
            return true;
        }
        return false;
    }

    public WildcardConfiguration getWildcardConfiguration() {
        return this.wildcardConfiguration;
    }

    public static String getConnectionIDString(RemotingConnection connection) {
        if (connection == null) {
            return null;
        }
        Object ID = connection.getID();
        return ID instanceof ChannelId ? ((ChannelId)ID).asLongText() : ID.toString();
    }

    public static String connectionToString(RemotingConnection connection) {
        if (connection == null) {
            return "";
        }
        String username = null;
        Subject subject = connection.getSubject();
        if (subject != null) {
            username = MQTTBrokerService.getSubjectName(subject);
        }
        return "connection=" + connection.getRemoteAddress() + ", clientID=" + connection.getClientID() + ", subject=" + username;
    }

    public static String getSubjectName(Subject subject) {
        return subject.getPrincipals().stream().filter(principal -> principal instanceof UserPrincipal).findFirst().map(Principal::getName).orElse(KeycloakIdentityProvider.getSubjectNameAndRealm((Subject)subject));
    }

    public RemotingConnection getConnectionFromClientID(String clientID) {
        if (TextUtil.isNullOrEmpty((String)clientID)) {
            return null;
        }
        RemotingConnection connection = (RemotingConnection)this.clientIDConnectionMap.get(clientID);
        if (connection == null) {
            for (RemotingConnection remotingConnection : this.server.getActiveMQServer().getRemotingService().getConnections()) {
                if (!Objects.equals(clientID, remotingConnection.getClientID())) continue;
                connection = remotingConnection;
                this.clientIDConnectionMap.put(clientID, connection);
                break;
            }
        }
        if (connection == null) {
            connection = (RemotingConnection)this.disconnectedConnectionCache.getIfPresent((Object)clientID);
        }
        return connection;
    }

    protected void notifyConnectionAuthenticated(RemotingConnection connection) {
        if (connection.getSubject() != null) {
            LOG.log(System.Logger.Level.DEBUG, "Client connection authenticated: " + MQTTBrokerService.connectionToString(connection));
            for (MQTTHandler handler : this.getCustomHandlers()) {
                handler.onConnectionAuthenticated(connection);
            }
        }
    }

    protected ClientSession createSession() throws Exception {
        ClientSessionInternal session = null;
        try {
            String internalClientID = UniqueIdentifierGenerator.generateId((String)"Internal client");
            session = (ClientSessionInternal)this.sessionFactory.createSession(null, null, false, true, true, true, this.serverLocator.getAckBatchSize(), internalClientID);
            session.addMetaData("jms-session", "Internal session");
            ServerSession serverSession = this.server.getActiveMQServer().getSessionByID(session.getName());
            serverSession.disableSecurity();
            session.start();
        }
        catch (Exception e) {
            LOG.log(System.Logger.Level.WARNING, "Failed to create MQTT client session", (Throwable)e);
        }
        return session;
    }

    protected WildcardConfiguration getServerWildcardConfiguration() {
        return this.server.getConfiguration().getWildcardConfiguration();
    }

    public void authenticateConnection(RemotingConnection connection, String realm, String username, String password) {
        if (connection != null) {
            connection.setSubject(null);
            this.securityManager.authenticate(realm + ":" + username, password, connection, null);
            this.notifyConnectionAuthenticated(connection);
        }
    }
}

