/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.manager.event;

import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.spi.WebSocketHttpExchange;
import java.io.IOException;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.undertow.UndertowComponent;
import org.apache.camel.component.undertow.UndertowConstants;
import org.apache.camel.component.undertow.UndertowHostKey;
import org.apache.camel.model.ChoiceDefinition;
import org.keycloak.KeycloakPrincipal;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.security.AuthContext;
import org.openremote.container.security.basic.BasicAuthContext;
import org.openremote.container.security.keycloak.AccessTokenAuthContext;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.event.EventAuthorizer;
import org.openremote.manager.event.EventSubscriptionAuthorizer;
import org.openremote.manager.event.EventTypeConverters;
import org.openremote.manager.event.UndertowHost;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.event.Event;
import org.openremote.model.event.RespondableEvent;
import org.openremote.model.event.TriggeredEventSubscription;
import org.openremote.model.event.shared.CancelEventSubscription;
import org.openremote.model.event.shared.EventFilter;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.event.shared.SharedEvent;
import org.openremote.model.event.shared.UnauthorizedEventSubscription;
import org.openremote.model.syslog.SyslogEvent;
import org.openremote.model.util.Pair;

public class ClientEventService
extends RouteBuilder
implements ContainerService {
    public static final int PRIORITY = 2147482547;
    public static final String WEBSOCKET_URI = "undertow://ws://0.0.0.0/websocket/events?fireWebSocketChannelEvents=true&sendTimeout=15000";
    protected static final System.Logger LOG = System.getLogger(ClientEventService.class.getName());
    protected static final String PUBLISH_QUEUE = "direct://ClientPublishQueue";
    protected final Collection<EventSubscriptionAuthorizer> eventSubscriptionAuthorizers = new CopyOnWriteArraySet<EventSubscriptionAuthorizer>();
    protected final Collection<EventAuthorizer> eventAuthorizers = new CopyOnWriteArraySet<EventAuthorizer>();
    protected final Set<Pair<EventSubscription<? extends Event>, Consumer<? extends Event>>> eventSubscriptions = new CopyOnWriteArraySet<Pair<EventSubscription<? extends Event>, Consumer<? extends Event>>>();
    protected final Map<String, WebSocketChannel> sessionChannels = new ConcurrentHashMap<String, WebSocketChannel>();
    protected final Map<String, Map<String, Consumer<? extends Event>>> websocketSessionSubscriptionConsumers = new HashMap<String, Map<String, Consumer<? extends Event>>>();
    protected TimerService timerService;
    protected ExecutorService executorService;
    protected MessageBrokerService messageBrokerService;
    protected ManagerIdentityService identityService;
    protected GatewayService gatewayService;
    protected boolean started;
    protected Consumer<Exchange> gatewayInterceptor;

    public static String getSessionKey(Exchange exchange) {
        return (String)exchange.getIn().getHeader("websocket.connectionKey", String.class);
    }

    public static String getClientId(Exchange exchange) {
        AuthContext authContext = (AuthContext)exchange.getIn().getHeader("AUTH_CONTEXT", AuthContext.class);
        if (authContext != null) {
            return authContext.getClientId();
        }
        return null;
    }

    public int getPriority() {
        return 2147482547;
    }

    public void init(final Container container) throws Exception {
        this.timerService = (TimerService)container.getService(TimerService.class);
        this.messageBrokerService = (MessageBrokerService)container.getService(MessageBrokerService.class);
        this.identityService = (ManagerIdentityService)container.getService(ManagerIdentityService.class);
        this.gatewayService = (GatewayService)container.getService(GatewayService.class);
        this.executorService = container.getExecutor();
        UndertowComponent undertowWebsocketComponent = new UndertowComponent(this, (CamelContext)this.messageBrokerService.getContext()){

            protected org.apache.camel.component.undertow.UndertowHost createUndertowHost(UndertowHostKey key) {
                return new UndertowHost(container, key, this.getHostOptions());
            }
        };
        this.messageBrokerService.getContext().addComponent("undertow", (Component)undertowWebsocketComponent);
        this.messageBrokerService.getContext().getTypeConverterRegistry().addTypeConverters((Object)new EventTypeConverters());
        this.messageBrokerService.getContext().addRoutes((RoutesBuilder)this);
    }

    public void configure() throws Exception {
        ((ChoiceDefinition)((ChoiceDefinition)this.from(WEBSOCKET_URI).routeId("ClientInbound-Websocket").routeConfigurationId("attributeEvent").threads().executorService(this.executorService).choice().when((Predicate)this.header("websocket.eventType")).process(exchange -> {
            UndertowConstants.EventType eventType = (UndertowConstants.EventType)exchange.getIn().getHeader("websocket.eventTypeEnum", UndertowConstants.EventType.class);
            WebSocketChannel webSocketChannel = (WebSocketChannel)exchange.getIn().getHeader("websocket.channel", WebSocketChannel.class);
            switch (eventType) {
                case ONOPEN: {
                    WebSocketHttpExchange httpExchange = (WebSocketHttpExchange)exchange.getIn().getHeader("websocket.exchange", WebSocketHttpExchange.class);
                    String realm = httpExchange.getRequestHeader("Realm");
                    Principal principal = httpExchange.getUserPrincipal();
                    BasicAuthContext authContext = null;
                    if (principal instanceof KeycloakPrincipal) {
                        KeycloakPrincipal keycloakPrincipal = (KeycloakPrincipal)principal;
                        authContext = new AccessTokenAuthContext(keycloakPrincipal.getKeycloakSecurityContext().getRealm(), keycloakPrincipal.getKeycloakSecurityContext().getToken());
                    } else if (principal instanceof BasicAuthContext) {
                        authContext = (BasicAuthContext)principal;
                    } else if (principal != null) {
                        LOG.log(System.Logger.Level.INFO, "Unsupported user principal type: " + String.valueOf(principal));
                    }
                    if (authContext != null && authContext.getUsername().startsWith("service-account-")) {
                        webSocketChannel.setIdleTimeout(30000L);
                    }
                    webSocketChannel.setAttribute("AUTH_CONTEXT", (Object)authContext);
                    webSocketChannel.setAttribute("Realm", (Object)realm);
                    exchange.getIn().setHeader("AUTH_CONTEXT", (Object)authContext);
                    exchange.getIn().setHeader("Realm", (Object)realm);
                    exchange.getIn().setHeader("connection.sessionOpen", (Object)true);
                    this.sessionChannels.put(ClientEventService.getSessionKey(exchange), webSocketChannel);
                    LOG.log(System.Logger.Level.DEBUG, "Client connection created: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    break;
                }
                case ONCLOSE: {
                    AuthContext authContext = (AuthContext)webSocketChannel.getAttribute("AUTH_CONTEXT");
                    String realm = (String)webSocketChannel.getAttribute("Realm");
                    String sessionKey = ClientEventService.getSessionKey(exchange);
                    exchange.getIn().setHeader("AUTH_CONTEXT", (Object)authContext);
                    exchange.getIn().setHeader("Realm", (Object)realm);
                    exchange.getIn().setHeader("connection.sessionClose", (Object)true);
                    this.sessionChannels.remove(ClientEventService.getSessionKey(exchange));
                    LOG.log(System.Logger.Level.DEBUG, "Client connection closed: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    LOG.log(System.Logger.Level.TRACE, "Removing subscriptions for session: " + sessionKey);
                    Map<String, Map<String, Consumer<? extends Event>>> map = this.websocketSessionSubscriptionConsumers;
                    synchronized (map) {
                        this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey, (s, subscriptionConsumers) -> {
                            subscriptionConsumers.forEach((subscriptionKey, consumer) -> this.removeSubscription((Consumer<? extends Event>)consumer));
                            return null;
                        });
                        break;
                    }
                }
                case ONERROR: {
                    AuthContext authContext = (AuthContext)webSocketChannel.getAttribute("AUTH_CONTEXT");
                    String realm = (String)webSocketChannel.getAttribute("Realm");
                    String sessionKey = ClientEventService.getSessionKey(exchange);
                    exchange.getIn().setHeader("AUTH_CONTEXT", (Object)authContext);
                    exchange.getIn().setHeader("Realm", (Object)realm);
                    exchange.getIn().setHeader("connection.sessionCloseError", (Object)true);
                    LOG.log(System.Logger.Level.DEBUG, "Client connection error: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    try {
                        webSocketChannel.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    this.sessionChannels.remove(ClientEventService.getSessionKey(exchange));
                    LOG.log(System.Logger.Level.TRACE, "Removing subscriptions for session: " + sessionKey);
                    Map<String, Map<String, Consumer<? extends Event>>> map = this.websocketSessionSubscriptionConsumers;
                    synchronized (map) {
                        this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey, (s, subscriptionConsumers) -> {
                            subscriptionConsumers.forEach((subscriptionKey, consumer) -> this.removeSubscription((Consumer<? extends Event>)consumer));
                            return null;
                        });
                        break;
                    }
                }
            }
            if (this.gatewayInterceptor != null) {
                this.gatewayInterceptor.accept(exchange);
            }
        })).stop()).endChoice().end().process(exchange -> {
            Object patt1$temp;
            WebSocketChannel webSocketChannel = (WebSocketChannel)exchange.getIn().getHeader("websocket.channel", WebSocketChannel.class);
            AuthContext authContext = (AuthContext)webSocketChannel.getAttribute("AUTH_CONTEXT");
            String realm = (String)webSocketChannel.getAttribute("Realm");
            exchange.getIn().setHeader("AUTH_CONTEXT", (Object)authContext);
            exchange.getIn().setHeader("Realm", (Object)realm);
            Object patt0$temp = exchange.getIn().getBody();
            if (patt0$temp instanceof String) {
                String bodyStr = (String)patt0$temp;
                if (bodyStr.startsWith("SUBSCRIBE:")) {
                    exchange.getIn().setBody(exchange.getIn().getBody(EventSubscription.class));
                } else if (bodyStr.startsWith("UNSUBSCRIBE:")) {
                    exchange.getIn().setBody(exchange.getIn().getBody(CancelEventSubscription.class));
                } else if (bodyStr.startsWith("EVENT:")) {
                    exchange.getIn().setBody(exchange.getIn().getBody(SharedEvent.class));
                }
            }
            if ((patt1$temp = exchange.getIn().getBody()) instanceof RespondableEvent) {
                RespondableEvent respondableEvent = (RespondableEvent)patt1$temp;
                respondableEvent.setResponseConsumer(ev -> this.sendToWebsocketSession(ClientEventService.getSessionKey(exchange), ev));
            }
            if (this.gatewayInterceptor != null) {
                this.gatewayInterceptor.accept(exchange);
            }
        }).process(exchange -> {
            AuthContext authContext = (AuthContext)exchange.getIn().getHeader("AUTH_CONTEXT", AuthContext.class);
            String realm = (String)exchange.getIn().getHeader("Realm", String.class);
            Object patt0$temp = exchange.getIn().getBody();
            if (patt0$temp instanceof EventSubscription) {
                EventSubscription subscription = (EventSubscription)patt0$temp;
                sessionKey = ClientEventService.getSessionKey(exchange);
                LOG.log(System.Logger.Level.TRACE, () -> "Adding subscription for session '" + sessionKey + "': " + String.valueOf(subscription));
                if (!this.authorizeEventSubscription(realm, authContext, subscription)) {
                    this.sendToWebsocketSession(sessionKey, new UnauthorizedEventSubscription(subscription));
                    exchange.setRouteStop(true);
                    return;
                }
                EventFilter patt1$temp = subscription.getFilter();
                if (patt1$temp instanceof AssetFilter) {
                    AssetFilter assetFilter = (AssetFilter)patt1$temp;
                    subscription.setFilter((EventFilter)assetFilter.setValueChanged(true));
                }
                subscription.setSubscribed(true);
                this.sendToWebsocketSession(sessionKey, subscription);
                Map<String, Map<String, Consumer<? extends Event>>> map = this.websocketSessionSubscriptionConsumers;
                synchronized (map) {
                    Consumer<SharedEvent> consumer = ev -> this.onWebsocketSubscriptionTriggered(sessionKey, (EventSubscription<?>)subscription, (SharedEvent)ev);
                    Map subscriptionConsumers2 = this.websocketSessionSubscriptionConsumers.computeIfAbsent(sessionKey, s -> new HashMap());
                    String subscriptionKey = subscription.getEventType() + subscription.getSubscriptionId();
                    subscriptionConsumers2.put(subscriptionKey, consumer);
                    this.addSubscription((EventSubscription<? extends Event>)subscription, consumer);
                }
                exchange.setRouteStop(true);
            } else {
                Object patt1$temp = exchange.getIn().getBody();
                if (patt1$temp instanceof CancelEventSubscription) {
                    CancelEventSubscription cancelEventSubscription = (CancelEventSubscription)patt1$temp;
                    sessionKey = ClientEventService.getSessionKey(exchange);
                    LOG.log(System.Logger.Level.TRACE, () -> "Cancelling subscription for session '" + sessionKey + "': " + String.valueOf(cancelEventSubscription));
                    Map<String, Map<String, Consumer<? extends Event>>> map = this.websocketSessionSubscriptionConsumers;
                    synchronized (map) {
                        this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey, (s, subscriptionConsumers) -> {
                            String subscriptionKey = cancelEventSubscription.getEventType() + cancelEventSubscription.getSubscriptionId();
                            Consumer consumer = (Consumer)subscriptionConsumers.remove(subscriptionKey);
                            if (consumer != null) {
                                this.removeSubscription(consumer);
                            }
                            if (subscriptionConsumers.isEmpty()) {
                                return null;
                            }
                            return subscriptionConsumers;
                        });
                    }
                    exchange.setRouteStop(true);
                } else {
                    Object patt2$temp = exchange.getIn().getBody();
                    if (patt2$temp instanceof SharedEvent) {
                        SharedEvent event = (SharedEvent)patt2$temp;
                        if (!this.authorizeEventWrite(realm, authContext, event)) {
                            exchange.setRouteStop(true);
                            return;
                        }
                        if (event instanceof AttributeEvent) {
                            AttributeEvent attributeEvent = (AttributeEvent)event;
                            if (attributeEvent.getTimestamp() <= 0L) {
                                attributeEvent.setTimestamp(this.timerService.getCurrentTimeMillis());
                            }
                            attributeEvent.setSource("WebsocketClient");
                            this.messageBrokerService.getFluentProducerTemplate().withBody((Object)attributeEvent).to("direct://AttributeEventProcessor").asyncSend();
                            exchange.setRouteStop(true);
                            return;
                        }
                    }
                }
            }
        }).to(PUBLISH_QUEUE).end();
        this.from(PUBLISH_QUEUE).routeId("ClientPublishToSubscribers").routeConfigurationId("attributeEvent").threads().executorService(this.executorService).filter(this.body().isInstanceOf(Event.class)).process(exchange -> {
            Event event = (Event)exchange.getIn().getBody(Event.class);
            this.sendToSubscribers(event);
        });
    }

    protected <T extends Event> void sendToSubscribers(T event) {
        this.eventSubscriptions.forEach(eventSubscriptionConsumerPair -> {
            Event filteredEvent;
            EventSubscription subscription = (EventSubscription)eventSubscriptionConsumerPair.getKey();
            if (!subscription.getEventType().equals(event.getEventType())) {
                return;
            }
            Event event2 = filteredEvent = subscription.getFilter() == null ? event : subscription.getFilter().apply(event);
            if (filteredEvent == null) {
                return;
            }
            Consumer consumer = (Consumer)eventSubscriptionConsumerPair.getValue();
            try {
                consumer.accept(filteredEvent);
            }
            catch (Exception e) {
                LOG.log(System.Logger.Level.WARNING, "Event subscriber has thrown an exception: " + String.valueOf(consumer), (Throwable)e);
            }
        });
    }

    public void addSubscription(EventSubscription<? extends Event> eventSubscription, Consumer<? extends Event> consumer) throws IllegalStateException {
        this.eventSubscriptions.add((Pair<EventSubscription<? extends Event>, Consumer<? extends Event>>)new Pair(eventSubscription, consumer));
    }

    public <T extends Event> void addSubscription(Class<T> eventClass, Consumer<T> consumer) throws IllegalStateException {
        this.addSubscription((EventSubscription<? extends Event>)new EventSubscription(eventClass, null), consumer);
    }

    public <T extends Event> void addSubscription(Class<T> eventClass, EventFilter<T> filter, Consumer<T> consumer) throws IllegalStateException {
        this.addSubscription((EventSubscription<? extends Event>)new EventSubscription(eventClass, filter), consumer);
    }

    public void removeSubscription(Consumer<? extends Event> consumer) {
        this.eventSubscriptions.removeIf(subscriptionConsumerPair -> subscriptionConsumerPair.value == consumer);
    }

    public void start(Container container) {
        this.started = true;
    }

    public void stop(Container container) {
        this.started = false;
    }

    public void addSubscriptionAuthorizer(EventSubscriptionAuthorizer authorizer) {
        this.eventSubscriptionAuthorizers.add(authorizer);
    }

    public void addEventAuthorizer(EventAuthorizer authorizer) {
        this.eventAuthorizers.add(authorizer);
    }

    public boolean authorizeEventSubscription(String realm, AuthContext authContext, EventSubscription<?> subscription) {
        boolean authorized = this.eventSubscriptionAuthorizers.stream().anyMatch(authorizer -> authorizer.authorise(realm, authContext, subscription));
        if (!authorized) {
            if (authContext != null) {
                LOG.log(System.Logger.Level.DEBUG, "Client not authorised to subscribe: subscription=" + String.valueOf(subscription) + ", requestRealm=" + realm + ", username=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
            } else {
                LOG.log(System.Logger.Level.DEBUG, "Client not authorised to subscribe: subscription=" + String.valueOf(subscription) + ", requestRealm=" + realm + ", user=null");
            }
        }
        return authorized;
    }

    public <T extends SharedEvent> boolean authorizeEventWrite(String realm, AuthContext authContext, T event) {
        boolean authorized = this.eventAuthorizers.stream().anyMatch(authorizer -> authorizer.authorise(realm, authContext, event));
        if (!authorized) {
            if (authContext != null) {
                LOG.log(System.Logger.Level.DEBUG, () -> "Client not authorised to send event: type=" + event.getEventType() + ", requestRealm=" + realm + ", user=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
            } else {
                LOG.log(System.Logger.Level.DEBUG, () -> "Client not authorised to send event: type=" + event.getEventType() + ", requestRealm=" + realm + ", user=null");
            }
        }
        return authorized;
    }

    public <T extends Event> void publishEvent(T event) {
        if (!this.started) {
            return;
        }
        if (!(event instanceof SyslogEvent)) {
            LOG.log(System.Logger.Level.TRACE, () -> "Publishing to subscribers: " + String.valueOf(event));
        }
        this.messageBrokerService.getFluentProducerTemplate().withBody(event).to(PUBLISH_QUEUE).asyncSend();
    }

    public void setGatewayInterceptor(Consumer<Exchange> consumer) {
        this.gatewayInterceptor = consumer;
    }

    protected void onWebsocketSubscriptionTriggered(String sessionKey, EventSubscription<?> subscription, SharedEvent event) {
        TriggeredEventSubscription triggeredEventSubscription = new TriggeredEventSubscription(Collections.singletonList(event), subscription.getSubscriptionId());
        this.messageBrokerService.getFluentProducerTemplate().withBody((Object)triggeredEventSubscription).withHeader("websocket.connectionKey", (Object)sessionKey).to(WEBSOCKET_URI).asyncSend();
    }

    public void sendToWebsocketSession(String sessionKey, Object data) {
        this.messageBrokerService.getFluentProducerTemplate().withBody(data).withHeader("websocket.connectionKey", (Object)sessionKey).to(WEBSOCKET_URI).asyncSend();
    }

    public void closeWebsocketSession(String sessionKey) {
        WebSocketChannel webSocketChannel = this.sessionChannels.get(sessionKey);
        if (webSocketChannel != null) {
            LOG.log(System.Logger.Level.INFO, () -> "Force closing websocket session: " + sessionKey);
            try {
                webSocketChannel.close();
            }
            catch (IOException e) {
                LOG.log(System.Logger.Level.DEBUG, () -> "Failed to close websocket session: " + sessionKey);
                throw new RuntimeException(e);
            }
        }
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + "{}";
    }
}

