package org.apache.airavata.workflow.tracking.impl.subscription;

import java.rmi.RemoteException;
import org.apache.airavata.workflow.tracking.WorkflowTrackingException;
import org.apache.airavata.workflow.tracking.client.Callback;
import org.apache.airavata.workflow.tracking.client.Subscription;
import org.apache.airavata.workflow.tracking.util.MessageUtil;
import org.apache.airavata.wsmg.client.MsgBrokerClientException;
import org.apache.airavata.wsmg.client.NotificationHandler;
import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.airavata.wsmg.client.msgbox.MessagePuller;
import org.apache.airavata.wsmg.client.msgbox.MsgboxHandler;
import org.apache.airavata.wsmg.commons.MsgBoxQNameConstants;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.EndpointReferenceHelper;
import org.apache.xmlbeans.XmlCursor;
import org.apache.xmlbeans.XmlException;
import org.apache.xmlbeans.XmlObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/workflow/tracking/impl/subscription/MessageBoxNotificationHandler.class */
public class MessageBoxNotificationHandler implements NotificationHandler {
    private static final Logger logger = LoggerFactory.getLogger(MessageBoxNotificationHandler.class);
    private String messageBoxUrl;
    private String brokerURL;
    private String subscriptionId;
    private MessagePuller messagePuller;
    private Callback callback;
    private String topic;

    public MessageBoxNotificationHandler(String str, String str2) {
        if (str == null || "".equals(str)) {
            logger.error("Invalid messagebox Location :" + str);
            throw new WorkflowTrackingException("BrokerLocation should be not null messaboxUrl:" + str);
        }
        if (str2 == null || "".equals(str2)) {
            logger.error("Invalid broker Location :" + str2);
            throw new WorkflowTrackingException("BrokerLocation should be not null brokerurl:" + str2);
        }
        this.messageBoxUrl = str;
        this.brokerURL = str2;
    }

    public void handleNotification(String str) {
        XmlObject xmlObject = null;
        try {
            xmlObject = XmlObject.Factory.parse(str);
            XmlCursor newCursor = xmlObject.newCursor();
            newCursor.toNextToken();
            newCursor.dispose();
        } catch (XmlException e) {
            logger.error("error parsing message content: " + str, e);
            e.printStackTrace();
        }
        this.callback.deliverMessage(this.topic, MessageUtil.getType(xmlObject), xmlObject);
    }

    public void destroy(EndpointReference endpointReference) throws RemoteException {
        if (this.messagePuller != null) {
            this.messagePuller.stopPulling();
            if (logger.isDebugEnabled()) {
                logger.info("\n\nStopping the Messagebox for topic" + this.topic);
            }
        }
        try {
            WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
            wseMsgBrokerClient.init(this.brokerURL.toString());
            wseMsgBrokerClient.unSubscribe(this.subscriptionId);
            MsgboxHandler msgboxHandler = new MsgboxHandler();
            logger.info("Unsubscribing the messagebox that was destroyed, SubscriptionID:" + this.subscriptionId);
            msgboxHandler.deleteMsgBox(endpointReference, 2000L);
        } catch (MsgBrokerClientException e) {
            logger.error("unable to unsubscribe", e);
            e.printStackTrace();
        }
    }

    public Subscription renewMessageboxSubscription(String str, String str2, String str3, String str4, boolean z) throws MsgBrokerClientException {
        this.subscriptionId = str2;
        this.topic = str3;
        WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
        try {
            EndpointReference fromString = EndpointReferenceHelper.fromString(str);
            Subscription subscription = new Subscription(this, subscribeToBroker(fromString.getAddress(), str3, str4, wseMsgBrokerClient, z), str3, this.callback, this.brokerURL);
            subscription.setMessageBoxEpr(fromString);
            return subscription;
        } catch (AxisFault e) {
            throw new MsgBrokerClientException("unable to convert end point reference", e);
        }
    }

    public Subscription renewMessageboxSubscription(EndpointReference endpointReference, String str, String str2, String str3, boolean z) throws MsgBrokerClientException {
        this.subscriptionId = str;
        this.topic = str2;
        WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
        logger.info("\n\nCreate Subscription for topic" + str2 + " [Messagebox]\n\n");
        Subscription subscription = new Subscription(this, subscribeToBroker(endpointReference.getAddress(), str2, str3, wseMsgBrokerClient, z), str2, this.callback, this.brokerURL);
        subscription.setMessageBoxEpr(endpointReference);
        return subscription;
    }

    public Subscription startListeningToPreviousMessageBox(EndpointReference endpointReference, String str, String str2, String str3, Callback callback, boolean z) throws MsgBrokerClientException {
        String format;
        this.callback = callback;
        this.subscriptionId = str;
        this.topic = str2;
        WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
        this.messagePuller = new MsgboxHandler().startPullingFromExistingMsgBox(endpointReference, this, 500L, 1000L);
        if (logger.isDebugEnabled()) {
            logger.info("\n\nCreate Subscription for topic" + str2 + " [Messagebox]\n\n");
        }
        String address = endpointReference.getAddress();
        if (address.contains("clientid")) {
            format = address;
        } else {
            if (endpointReference.getAllReferenceParameters() == null) {
                throw new MsgBrokerClientException("Invalid Message Box EPR, no reference parameters found");
            }
            String text = ((OMElement) endpointReference.getAllReferenceParameters().get(MsgBoxQNameConstants.MSG_BOXID_QNAME)).getText();
            if (text == null) {
                throw new MsgBrokerClientException("Invalid Message Box EPR, reference parameter MsgBoxAddr is missing");
            }
            format = String.format(address.endsWith("/") ? "%sclientid/%s" : "%s/clientid/%s", address, text);
        }
        Subscription subscription = new Subscription(this, subscribeToBroker(format, str2, str3, wseMsgBrokerClient, z), str2, callback, this.brokerURL);
        subscription.setMessageBoxEpr(endpointReference);
        return subscription;
    }

    private String subscribeToBroker(String str, String str2, String str3, WseMsgBrokerClient wseMsgBrokerClient, boolean z) throws MsgBrokerClientException {
        wseMsgBrokerClient.init(this.brokerURL);
        return z ? wseMsgBrokerClient.subscribe(new EndpointReference(str), str2, str3, -1L) : wseMsgBrokerClient.subscribe(str, str2, str3);
    }

    private String subToBrokerWithMsgBoxSink(EndpointReference endpointReference, String str, String str2, WseMsgBrokerClient wseMsgBrokerClient, boolean z) throws MsgBrokerClientException {
        wseMsgBrokerClient.init(this.brokerURL);
        return z ? wseMsgBrokerClient.subscribeMsgBox(endpointReference, str, str2, -1L) : wseMsgBrokerClient.subscribeMsgBox(endpointReference, str, str2, 259200000L);
    }

    public Subscription createSubscription(String str, String str2, Callback callback, boolean z) throws Exception {
        this.topic = str;
        this.callback = callback;
        WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
        MsgboxHandler msgboxHandler = new MsgboxHandler();
        EndpointReference createPullMsgBox = msgboxHandler.createPullMsgBox(this.messageBoxUrl, 12000L);
        String address = createPullMsgBox.getAddress();
        if (logger.isDebugEnabled()) {
            logger.debug("\n\nCreated Messagebox at address :" + address);
        }
        this.subscriptionId = subToBrokerWithMsgBoxSink(createPullMsgBox, str, str2, wseMsgBrokerClient, z);
        this.messagePuller = msgboxHandler.startPullingEventsFromMsgBox(createPullMsgBox, this, 1500L, 30000L);
        if (logger.isDebugEnabled()) {
            logger.debug("\n\nCreate Subscription for topic" + str + " [Messagebox]\n\n");
        }
        Subscription subscription = new Subscription(this, this.subscriptionId, str, callback, this.brokerURL);
        subscription.setMessageBoxEpr(createPullMsgBox);
        subscription.setBrokerURL(this.brokerURL);
        return subscription;
    }

    public Subscription createMsgBoxSubscription(String str, String str2, Callback callback, boolean z) throws MsgBrokerClientException {
        this.topic = str;
        this.callback = callback;
        WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
        MsgboxHandler msgboxHandler = new MsgboxHandler();
        EndpointReference createPullMsgBox = msgboxHandler.createPullMsgBox(this.messageBoxUrl, 12000L);
        if (logger.isDebugEnabled()) {
            logger.info("\n\nCreated Messagebox at address :" + createPullMsgBox.getAddress());
        }
        this.subscriptionId = subToBrokerWithMsgBoxSink(createPullMsgBox, this.topic, str2, wseMsgBrokerClient, z);
        this.messagePuller = msgboxHandler.startPullingEventsFromMsgBox(createPullMsgBox, this, 500L, 30000L);
        if (logger.isDebugEnabled()) {
            logger.info("\n\nCreate Subscription for topic" + this.topic + " [Messagebox]\n\n");
        }
        Subscription subscription = new Subscription(this, this.subscriptionId, this.topic, this.callback, this.brokerURL);
        subscription.setMessageBoxEpr(createPullMsgBox);
        subscription.setBrokerURL(this.brokerURL);
        return subscription;
    }
}
