/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.websocket;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.websocket.BinaryMessageConsumer;
import org.apache.nifi.websocket.ConnectedListener;
import org.apache.nifi.websocket.SendMessage;
import org.apache.nifi.websocket.TextMessageConsumer;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketMessageRouter {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class);
    private final String endpointId;
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
    private volatile Processor processor;

    public WebSocketMessageRouter(String endpointId) {
        this.endpointId = endpointId;
    }

    public synchronized void registerProcessor(Processor processor) throws WebSocketConfigurationException {
        if (this.processor != null) {
            throw new WebSocketConfigurationException("Processor " + this.processor + " is already assigned to this router.");
        }
        this.processor = processor;
    }

    public boolean isProcessorRegistered(Processor processor) {
        return this.processor != null && this.processor.getIdentifier().equals(processor.getIdentifier());
    }

    public synchronized void deregisterProcessor(Processor processor) {
        if (!this.isProcessorRegistered(processor)) {
            if (this.processor == null) {
                logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor", new Object[]{processor});
            } else {
                logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}", new Object[]{processor, this.processor});
            }
            return;
        }
        this.processor = null;
        this.sessions.keySet().forEach(sessionId -> {
            try {
                this.disconnect((String)sessionId, "Processing has stopped.");
            }
            catch (IOException e) {
                logger.warn("Failed to disconnect session {} due to {}", new Object[]{sessionId, e, e});
            }
        });
    }

    public void captureSession(WebSocketSession session) {
        String sessionId = session.getSessionId();
        this.sessions.put(sessionId, session);
        if (this.processor != null && this.processor instanceof ConnectedListener) {
            ((ConnectedListener)this.processor).connected(session);
        }
    }

    public void onWebSocketClose(String sessionId, int statusCode, String reason) {
        this.sessions.remove(sessionId);
    }

    public void onWebSocketText(String sessionId, String message) {
        if (this.processor != null && this.processor instanceof TextMessageConsumer) {
            ((TextMessageConsumer)this.processor).consume(this.getSessionOrFail(sessionId), message);
        }
    }

    public void onWebSocketBinary(String sessionId, byte[] payload, int offset, int length) {
        if (this.processor != null && this.processor instanceof BinaryMessageConsumer) {
            ((BinaryMessageConsumer)this.processor).consume(this.getSessionOrFail(sessionId), payload, offset, length);
        }
    }

    private WebSocketSession getSessionOrFail(String sessionId) {
        WebSocketSession session = this.sessions.get(sessionId);
        if (session == null) {
            throw new IllegalStateException("Session was not found for the sessionId: " + sessionId);
        }
        return session;
    }

    public void sendMessage(String sessionId, SendMessage sendMessage) throws IOException {
        if (!StringUtils.isEmpty((CharSequence)sessionId)) {
            WebSocketSession session = this.getSessionOrFail(sessionId);
            sendMessage.send(session);
        } else {
            this.sessions.keySet().forEach(itrSessionId -> {
                try {
                    WebSocketSession session = this.getSessionOrFail((String)itrSessionId);
                    sendMessage.send(session);
                }
                catch (IOException e) {
                    logger.warn("Failed to send message to session {} due to {}", new Object[]{itrSessionId, e, e});
                }
            });
        }
    }

    public void disconnect(String sessionId, String reason) throws IOException {
        WebSocketSession session = this.getSessionOrFail(sessionId);
        session.close(reason);
        this.sessions.remove(sessionId);
    }

    public boolean containsSession(String sessionId) {
        return this.sessions.containsKey(sessionId);
    }
}

