/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.xbaya.interpretor;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
import org.apache.airavata.common.utils.XMLUtil;
import org.apache.airavata.registry.api.workflow.NodeExecutionError;
import org.apache.airavata.workflow.model.graph.ControlPort;
import org.apache.airavata.workflow.model.graph.EPRPort;
import org.apache.airavata.workflow.model.graph.Edge;
import org.apache.airavata.workflow.model.graph.Graph;
import org.apache.airavata.workflow.model.graph.Node;
import org.apache.airavata.workflow.model.graph.Port;
import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
import org.apache.airavata.workflow.model.graph.system.InputNode;
import org.apache.airavata.workflow.model.graph.system.OutputNode;
import org.apache.airavata.workflow.model.graph.util.GraphUtil;
import org.apache.airavata.workflow.model.graph.ws.WSGraph;
import org.apache.airavata.workflow.model.wf.Workflow;
import org.apache.airavata.ws.monitor.EventData;
import org.apache.airavata.ws.monitor.MonitorException;
import org.apache.airavata.ws.monitor.MonitorUtil;
import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
import org.apache.airavata.wsmg.client.MsgBrokerClientException;
import org.apache.airavata.wsmg.client.NotificationHandler;
import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.airavata.wsmg.client.msgbox.MessagePuller;
import org.apache.airavata.xbaya.XBayaConfiguration;
import org.apache.airavata.xbaya.graph.controller.NodeController;
import org.apache.airavata.xbaya.interpretor.WorkflowInterpreter;
import org.apache.airavata.xbaya.interpretor.WorkflowInterpreterConfiguration;
import org.apache.airavata.xbaya.provenance.WorkflowNodeStatusUpdater;
import org.apache.airavata.xbaya.provenance.WorkflowStatusUpdater;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.addressing.EndpointReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.infoset.XmlElement;

public class WorkflowInterpretorEventListener
implements NotificationHandler,
ConsumerNotificationHandler {
    private Workflow workflow;
    private boolean pullMode;
    private WseMsgBrokerClient wseClient;
    private URI brokerURL;
    private String topic;
    private URI messageBoxURL;
    private String subscriptionID;
    private MessagePuller messagePuller;
    private WorkflowStatusUpdater workflowStatusUpdater;
    private WorkflowNodeStatusUpdater workflowNodeStatusUpdater;
    private WorkflowInterpreterConfiguration workflowInterpreterConfiguration;
    private String lastSubscriptionId;
    private static Logger logger = LoggerFactory.getLogger(WorkflowInterpretorEventListener.class);

    public WorkflowInterpretorEventListener(Workflow workflow, XBayaConfiguration configuration) {
        this.workflow = workflow;
        this.brokerURL = configuration.getBrokerURL();
        this.topic = configuration.getTopic();
        this.pullMode = true;
        this.messageBoxURL = configuration.getMessageBoxURL();
        this.wseClient = new WseMsgBrokerClient();
        this.wseClient.init(this.brokerURL.toString());
        this.workflowInterpreterConfiguration = WorkflowInterpreter.getWorkflowInterpreterConfiguration();
        this.workflowNodeStatusUpdater = new WorkflowNodeStatusUpdater(this.workflowInterpreterConfiguration.getAiravataAPI());
        this.workflowStatusUpdater = new WorkflowStatusUpdater(this.workflowInterpreterConfiguration.getAiravataAPI());
    }

    public void start() throws MonitorException {
        this.subscribe();
    }

    public void stop() throws MonitorException {
        this.unsubscribe();
    }

    private synchronized void subscribe() throws MonitorException {
        if (this.subscriptionID != null) {
            throw new IllegalStateException();
        }
        try {
            if (this.pullMode) {
                EndpointReference messageBoxEPR = this.wseClient.createPullMsgBox(this.messageBoxURL.toString(), 20000L);
                this.subscriptionID = this.wseClient.subscribe(messageBoxEPR.getAddress(), this.topic, null);
                this.messagePuller = this.wseClient.startPullingEventsFromMsgBox(messageBoxEPR, (NotificationHandler)this, 1000L, 20000L);
            } else {
                String[] endpoints = this.wseClient.startConsumerService(2222, (ConsumerNotificationHandler)this);
                this.subscriptionID = this.wseClient.subscribe(endpoints[0], this.topic, null);
            }
        }
        catch (IOException e) {
            throw new MonitorException("Failed to subscribe.", (Throwable)e);
        }
        catch (RuntimeException e) {
            throw new MonitorException("Failed to subscribe.", (Throwable)e);
        }
    }

    private synchronized void unsubscribe() throws MonitorException {
        if (this.subscriptionID == null) {
            throw new IllegalStateException();
        }
        try {
            if (this.pullMode) {
                this.messagePuller.stopPulling();
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    throw new MonitorException("Error during stop message puller", (Throwable)e);
                }
            }
            this.wseClient.unSubscribe(this.subscriptionID);
        }
        catch (MsgBrokerClientException e) {
            throw new MonitorException("Failed to unsubscribe.", (Throwable)e);
        }
    }

    public void handleNotification(String message) {
        try {
            XmlElement event = XMLUtil.stringToXmlElement((String)message);
            this.handleEvent(new EventData(event), true, (Graph)this.workflow.getGraph());
        }
        catch (RuntimeException e) {
            logger.warn("Failed to process notification: " + message, (Throwable)e);
        }
        catch (AiravataAPIInvocationException e) {
            logger.error("Error occured during Exception saving to the Registry");
        }
    }

    private void handleEvent(EventData event, boolean forward, Graph graph) throws AiravataAPIInvocationException {
        MonitorUtil.EventType type = event.getType();
        String nodeID = event.getNodeID();
        Node node = graph.getNode(nodeID);
        if (type == MonitorUtil.EventType.WORKFLOW_INVOKED) {
            this.workflowStarted(graph, forward);
            this.workflowStatusUpdater.saveWorkflowData(event.getExperimentID(), event.getExperimentID(), this.workflowInterpreterConfiguration.getWorkflow().getName());
            this.workflowStatusUpdater.workflowStarted(event.getExperimentID());
        } else if (type == MonitorUtil.EventType.WORKFLOW_TERMINATED) {
            this.workflowFinished(graph, forward);
            this.workflowStatusUpdater.workflowFinished(event.getExperimentID());
            try {
                this.unsubscribe();
            }
            catch (MonitorException e) {
                e.printStackTrace();
            }
        } else if (type == MonitorUtil.EventType.INVOKING_SERVICE || type == MonitorUtil.EventType.SERVICE_INVOKED) {
            if (node == null) {
                if (nodeID != null && !nodeID.equals("")) {
                    logger.warn("There is no node that has ID, " + nodeID);
                }
            } else {
                this.nodeStarted(node, forward);
                this.workflowNodeStatusUpdater.workflowNodeStarted(event.getExperimentID(), event.getNodeID(), event.getMessage(), event.getWorkflowID().toASCIIString());
            }
        } else if (type == MonitorUtil.EventType.RECEIVED_RESULT || type == MonitorUtil.EventType.SENDING_RESULT) {
            if (node == null) {
                if (nodeID != null && !nodeID.equals("")) {
                    logger.warn("There is no node that has ID, " + nodeID);
                }
            } else {
                this.nodeFinished(node, forward);
                this.workflowNodeStatusUpdater.workflowNodeFinished(event.getExperimentID(), event.getNodeID(), event.getMessage(), event.getWorkflowID().toASCIIString());
            }
        } else if (type == MonitorUtil.EventType.RECEIVED_FAULT || type == MonitorUtil.EventType.SENDING_FAULT || type == MonitorUtil.EventType.SENDING_RESPONSE_FAILED) {
            logger.error(event.getMessage());
            NodeExecutionError nodeExecutionError = new NodeExecutionError();
            nodeExecutionError.setExperimentId(event.getExperimentID());
            nodeExecutionError.setNodeId(event.getNodeID());
            nodeExecutionError.setWorkflowInstanceId(event.getExperimentID());
            nodeExecutionError.setErrorMessage(event.getMessage());
            nodeExecutionError.setErrorDescription(event.getMessage());
            nodeExecutionError.setErrorTime(event.getTimestamp());
            this.workflowInterpreterConfiguration.getAiravataAPI().getExecutionManager().addNodeExecutionError(nodeExecutionError);
            if (node == null) {
                if (nodeID != null && !nodeID.equals("")) {
                    logger.warn("There is no node that has ID, " + nodeID);
                }
            } else {
                this.nodeFailed(node, forward);
                this.workflowNodeStatusUpdater.workflowNodeFailed(event.getExperimentID(), event.getNodeID());
            }
            try {
                this.unsubscribe();
            }
            catch (MonitorException e) {
                e.printStackTrace();
            }
        } else if (type == MonitorUtil.EventType.RESOURCE_MAPPING) {
            if (node == null) {
                if (nodeID != null && !nodeID.equals("")) {
                    logger.warn("There is no node that has ID, " + nodeID);
                }
            } else {
                this.workflowNodeStatusUpdater.workflowNodeRunning(event.getExperimentID(), event.getNodeID());
            }
        } else if (type == MonitorUtil.EventType.LOG_INFO) {
            if (event.getMessage().endsWith("DONE")) {
                this.workflowNodeStatusUpdater.workflowNodeStatusDone(event.getExperimentID(), event.getNodeID());
            } else if (event.getMessage().endsWith("PENDING")) {
                this.workflowNodeStatusUpdater.workflowNodeStatusPending(event.getExperimentID(), event.getNodeID());
            } else if (event.getMessage().endsWith("ACTIVE")) {
                this.workflowNodeStatusUpdater.workflowNodeStatusActive(event.getExperimentID(), event.getNodeID());
            }
        }
    }

    private void workflowStarted(Graph graph, boolean forward) {
        for (InputNode node : GraphUtil.getInputNodes((Graph)graph)) {
            if (forward) {
                this.finishNode((Node)node);
                continue;
            }
            this.resetNode((Node)node);
        }
    }

    private void workflowFinished(Graph graph, boolean forward) {
        for (OutputNode node : GraphUtil.getOutputNodes((Graph)graph)) {
            if (forward) {
                this.finishNode((Node)node);
                this.finishPredecessorNodes((Node)node);
                continue;
            }
            this.resetNode((Node)node);
        }
    }

    private LinkedList<InputNode> getInputNodes(WSGraph graph) {
        List nodes = graph.getNodes();
        LinkedList<InputNode> inputNodes = new LinkedList<InputNode>();
        for (NodeImpl nodeImpl : nodes) {
            if (!(nodeImpl instanceof InputNode)) continue;
            inputNodes.add((InputNode)nodeImpl);
        }
        return inputNodes;
    }

    private LinkedList<OutputNode> getOutputNodes(WSGraph graph) {
        List nodes = graph.getNodes();
        LinkedList<OutputNode> outputNodes = new LinkedList<OutputNode>();
        for (NodeImpl nodeImpl : nodes) {
            if (!(nodeImpl instanceof OutputNode)) continue;
            outputNodes.add((OutputNode)nodeImpl);
        }
        return outputNodes;
    }

    private void nodeStarted(Node node, boolean forward) {
        if (forward) {
            this.executeNode(node);
            this.finishPredecessorNodes(node);
        } else {
            this.resetNode(node);
        }
    }

    private void nodeFinished(Node node, boolean forward) {
        if (forward) {
            this.finishNode(node);
            this.finishPredecessorNodes(node);
        } else {
            this.executeNode(node);
        }
    }

    private void nodeFailed(Node node, boolean forward) {
        if (forward) {
            this.failNode(node);
            this.finishPredecessorNodes(node);
        } else {
            this.executeNode(node);
        }
    }

    private void executeNode(Node node) {
        node.setState(Node.NodeExecutionState.EXECUTING);
    }

    private void finishNode(Node node) {
        node.setState(Node.NodeExecutionState.FINISHED);
    }

    private void failNode(Node node) {
        node.setState(Node.NodeExecutionState.FAILED);
    }

    private void resetNode(Node node) {
        node.setState(Node.NodeExecutionState.WAITING);
        NodeController.getGUI(node).resetTokens();
    }

    private void finishPredecessorNodes(Node node) {
        for (Port inputPort : node.getInputPorts()) {
            for (Edge edge : inputPort.getEdges()) {
                Port fromPort = edge.getFromPort();
                if (fromPort instanceof EPRPort) continue;
                Node fromNode = fromPort.getNode();
                this.finishNode(fromNode);
                this.finishPredecessorNodes(fromNode);
            }
        }
        ControlPort controlInPort = node.getControlInPort();
        if (controlInPort != null) {
            for (Node fromNode : controlInPort.getFromNodes()) {
                this.finishNode(fromNode);
                this.finishPredecessorNodes(fromNode);
            }
        }
    }

    public void handleNotification(SOAPEnvelope message) {
        String soapBody = message.getBody().toString();
        this.handleNotification(soapBody);
    }
}

