/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.Consumer;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.messaging.event.Message;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQConsumer
implements Consumer {
    private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
    private String exchangeName;
    private String url;
    private Connection connection;
    private Channel channel;
    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();

    public RabbitMQConsumer() throws AiravataException {
        try {
            this.url = ServerSettings.getSetting((String)"rabbitmq.broker.url");
            this.exchangeName = ServerSettings.getSetting((String)"rabbitmq.exchange.name");
            this.createConnection();
        }
        catch (ApplicationSettingsException e) {
            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
            log.error(message, (Throwable)e);
            throw new AiravataException(message, (Throwable)e);
        }
    }

    public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException {
        this.exchangeName = exchangeName;
        this.url = brokerUrl;
        this.createConnection();
    }

    private void createConnection() throws AiravataException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.url);
            this.connection = connectionFactory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener(){

                public void shutdownCompleted(ShutdownSignalException cause) {
                }
            });
            log.info("connected to rabbitmq: " + this.connection + " for " + this.exchangeName);
            this.channel = this.connection.createChannel();
            this.channel.exchangeDeclare(this.exchangeName, "topic", false);
        }
        catch (Exception e) {
            String msg = "could not open channel for exchange " + this.exchangeName;
            log.error(msg);
            throw new AiravataException(msg, (Throwable)e);
        }
    }

    @Override
    public String listen(final MessageHandler handler) throws AiravataException {
        try {
            Map<String, Object> props = handler.getProperties();
            Object routing = props.get("routingKey");
            if (routing == null) {
                throw new IllegalArgumentException("The routing key must be present");
            }
            ArrayList<String> keys = new ArrayList<String>();
            if (routing instanceof List) {
                for (Object o : (List)routing) {
                    keys.add(o.toString());
                }
            } else if (routing instanceof String) {
                keys.add((String)routing);
            }
            String queueName = (String)props.get("queue");
            String consumerTag = (String)props.get("consumerTag");
            if (queueName == null) {
                if (!this.channel.isOpen()) {
                    this.channel = this.connection.createChannel();
                    this.channel.exchangeDeclare(this.exchangeName, "topic", false);
                }
                queueName = this.channel.queueDeclare().getQueue();
            } else {
                this.channel.queueDeclare(queueName, true, false, false, null);
            }
            final String id = this.getId(keys, queueName);
            if (this.queueDetailsMap.containsKey(id)) {
                throw new IllegalStateException("This subscriber is already defined for this Consumer, cannot define the same subscriber twice");
            }
            if (consumerTag == null) {
                consumerTag = "default";
            }
            for (String routingKey : keys) {
                this.channel.queueBind(queueName, this.exchangeName, routingKey);
            }
            this.channel.basicConsume(queueName, true, consumerTag, (com.rabbitmq.client.Consumer)new DefaultConsumer(this.channel){

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    Message message = new Message();
                    try {
                        ThriftUtils.createThriftFromBytes((byte[])body, (TBase)message);
                        ExperimentStatusChangeEvent event = null;
                        String gatewayId = null;
                        if (message.getMessageType().equals((Object)MessageType.EXPERIMENT)) {
                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
                            ThriftUtils.createThriftFromBytes((byte[])message.getEvent(), (TBase)experimentStatusChangeEvent);
                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  with status " + experimentStatusChangeEvent.getState());
                            event = experimentStatusChangeEvent;
                            gatewayId = experimentStatusChangeEvent.getGatewayId();
                        } else if (message.getMessageType().equals((Object)MessageType.WORKFLOWNODE)) {
                            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
                            ThriftUtils.createThriftFromBytes((byte[])message.getEvent(), (TBase)wfnStatusChangeEvent);
                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  with status " + wfnStatusChangeEvent.getState());
                            event = wfnStatusChangeEvent;
                            gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
                        } else if (message.getMessageType().equals((Object)MessageType.TASK)) {
                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
                            ThriftUtils.createThriftFromBytes((byte[])message.getEvent(), (TBase)taskStatusChangeEvent);
                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  with status " + taskStatusChangeEvent.getState());
                            event = taskStatusChangeEvent;
                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
                        } else if (message.getMessageType().equals((Object)MessageType.JOB)) {
                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
                            ThriftUtils.createThriftFromBytes((byte[])message.getEvent(), (TBase)jobStatusChangeEvent);
                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  with status " + jobStatusChangeEvent.getState());
                            event = jobStatusChangeEvent;
                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
                        }
                        MessageContext messageContext = new MessageContext((TBase)event, message.getMessageType(), message.getMessageId(), gatewayId);
                        messageContext.setUpdatedTime(AiravataUtils.getTime((long)message.getUpdatedTime()));
                        handler.onMessage(messageContext);
                    }
                    catch (TException e) {
                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
                        log.warn(msg, (Throwable)e);
                    }
                }
            });
            this.queueDetailsMap.put(id, new QueueDetails(queueName, keys));
            return id;
        }
        catch (Exception e) {
            String msg = "could not open channel for exchange " + this.exchangeName;
            log.error(msg);
            throw new AiravataException(msg, (Throwable)e);
        }
    }

    @Override
    public void stopListen(String id) throws AiravataException {
        QueueDetails details = this.queueDetailsMap.get(id);
        if (details != null) {
            try {
                for (String key : details.getRoutingKeys()) {
                    this.channel.queueUnbind(details.getQueueName(), this.exchangeName, key);
                }
                this.channel.queueDelete(details.getQueueName(), true, true);
            }
            catch (IOException e) {
                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + this.exchangeName;
                log.debug(msg);
            }
        }
    }

    private String getId(List<String> routingKeys, String queueName) {
        String id = "";
        for (String key : routingKeys) {
            id = id + "_" + key;
        }
        return id + "_" + queueName;
    }

    public void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private class QueueDetails {
        String queueName;
        List<String> routingKeys;

        private QueueDetails(String queueName, List<String> routingKeys) {
            this.queueName = queueName;
            this.routingKeys = routingKeys;
        }

        public String getQueueName() {
            return this.queueName;
        }

        public List<String> getRoutingKeys() {
            return this.routingKeys;
        }
    }
}

