/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.websocket.jetty;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.apache.nifi.websocket.jetty.AbstractJettyWebSocketService;
import org.apache.nifi.websocket.jetty.RoutingWebSocketListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

@Tags(value={"WebSocket", "Jetty", "client"})
@CapabilityDescription(value="Implementation of WebSocketClientService. This service uses Jetty WebSocket client module to provide WebSocket session management throughout the application.")
public class JettyWebSocketClient
extends AbstractJettyWebSocketService
implements WebSocketClientService {
    public static final PropertyDescriptor WS_URI = new PropertyDescriptor.Builder().name("websocket-uri").displayName("WebSocket URI").description("The WebSocket URI this client connects to.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.URI_VALIDATOR).addValidator((subject, input, context) -> {
        ValidationResult.Builder result = new ValidationResult.Builder().valid(input.startsWith("/")).subject(subject);
        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
            result.explanation("Expression Language Present").valid(true);
        } else {
            result.explanation("Protocol should be either 'ws' or 'wss'.").valid(input.startsWith("ws://") || input.startsWith("wss://"));
        }
        return result.build();
    }).build();
    public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder().name("connection-timeout").displayName("Connection Timeout").description("The timeout to connect the WebSocket URI.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("3 sec").build();
    public static final PropertyDescriptor SESSION_MAINTENANCE_INTERVAL = new PropertyDescriptor.Builder().name("session-maintenance-interval").displayName("Session Maintenance Interval").description("The interval between session maintenance activities. A WebSocket session established with a WebSocket server can be terminated due to different reasons including restarting the WebSocket server or timing out inactive sessions. This session maintenance activity is periodically executed in order to reconnect those lost sessions, so that a WebSocket client can reuse the same session id transparently after it reconnects successfully.  The maintenance activity is executed until corresponding processors or this controller service is stopped.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 sec").build();
    private static final List<PropertyDescriptor> properties;
    private WebSocketClient client;
    private URI webSocketUri;
    private long connectionTimeoutMillis;
    private volatile ScheduledExecutorService sessionMaintenanceScheduler;
    private final ReentrantLock connectionLock = new ReentrantLock();
    private Map<String, String> activeSessions = new ConcurrentHashMap<String, String>();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    @OnEnabled
    public void startClient(ConfigurationContext context) throws Exception {
        SSLContextService sslService = (SSLContextService)context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
        SslContextFactory sslContextFactory = null;
        if (sslService != null) {
            sslContextFactory = this.createSslFactory(sslService, false, false);
        }
        this.client = new WebSocketClient(sslContextFactory);
        this.configurePolicy(context, this.client.getPolicy());
        this.client.start();
        this.activeSessions.clear();
        this.webSocketUri = new URI(context.getProperty(WS_URI).getValue());
        this.connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
        Long sessionMaintenanceInterval = context.getProperty(SESSION_MAINTENANCE_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
        this.sessionMaintenanceScheduler = Executors.newSingleThreadScheduledExecutor();
        this.sessionMaintenanceScheduler.scheduleAtFixedRate(() -> {
            try {
                this.maintainSessions();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to maintain sessions due to {}", new Object[]{e}, (Throwable)e);
            }
        }, sessionMaintenanceInterval, sessionMaintenanceInterval, TimeUnit.MILLISECONDS);
    }

    @OnDisabled
    @OnShutdown
    public void stopClient() throws Exception {
        this.activeSessions.clear();
        if (this.sessionMaintenanceScheduler != null) {
            try {
                this.sessionMaintenanceScheduler.shutdown();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to shutdown session maintainer due to {}", new Object[]{e}, (Throwable)e);
            }
            this.sessionMaintenanceScheduler = null;
        }
        if (this.client == null) {
            return;
        }
        this.client.stop();
        this.client = null;
    }

    public void connect(String clientId) throws IOException {
        this.connect(clientId, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect(String clientId, String sessionId) throws IOException {
        this.connectionLock.lock();
        try {
            Session session;
            WebSocketMessageRouter router;
            try {
                router = this.routers.getRouterOrFail(clientId);
            }
            catch (WebSocketConfigurationException e) {
                throw new IllegalStateException("Failed to get router due to: " + (Object)((Object)e), e);
            }
            RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
            listener.setSessionId(sessionId);
            ClientUpgradeRequest request = new ClientUpgradeRequest();
            Future connect = this.client.connect((Object)listener, this.webSocketUri, request);
            this.getLogger().info("Connecting to : {}", new Object[]{this.webSocketUri});
            try {
                session = (Session)connect.get(this.connectionTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                throw new IOException("Failed to connect " + this.webSocketUri + " due to: " + e, e);
            }
            this.getLogger().info("Connected, session={}", new Object[]{session});
            this.activeSessions.put(clientId, listener.getSessionId());
        }
        finally {
            this.connectionLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maintainSessions() throws Exception {
        if (this.client == null) {
            return;
        }
        this.connectionLock.lock();
        ComponentLog logger = this.getLogger();
        try {
            for (String clientId : this.activeSessions.keySet()) {
                WebSocketMessageRouter router;
                try {
                    router = this.routers.getRouterOrFail(clientId);
                }
                catch (WebSocketConfigurationException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("The clientId {} is no longer active. Discarding the clientId.", new Object[]{clientId});
                    }
                    this.activeSessions.remove(clientId);
                    continue;
                }
                String sessionId = this.activeSessions.get(clientId);
                if (router.containsSession(sessionId)) continue;
                this.connect(clientId, sessionId);
            }
        }
        finally {
            this.connectionLock.unlock();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Session maintenance completed. activeSessions={}", new Object[]{this.activeSessions});
        }
    }

    public String getTargetUri() {
        return this.webSocketUri.toString();
    }

    static {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.addAll(JettyWebSocketClient.getAbstractPropertyDescriptors());
        props.add(WS_URI);
        props.add(SSL_CONTEXT);
        props.add(CONNECTION_TIMEOUT);
        props.add(SESSION_MAINTENANCE_INTERVAL);
        properties = Collections.unmodifiableList(props);
    }
}

