package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.WSConstants;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.model.messaging.event.Message;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.class */
public class RabbitMQTaskLaunchConsumer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RabbitMQTaskLaunchConsumer.class);
    private static Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQStatusConsumer.class);
    private String taskLaunchExchangeName;
    private String url;
    private Connection connection;
    private Channel channel;
    private Map<String, QueueDetails> queueDetailsMap = new HashMap();
    private boolean durableQueue;
    private MessageHandler messageHandler;
    private int prefetchCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer$QueueDetails.class */
    public class QueueDetails {
        String queueName;
        List<String> routingKeys;

        private QueueDetails(String str, List<String> list) {
            this.queueName = str;
            this.routingKeys = list;
        }

        public String getQueueName() {
            return this.queueName;
        }

        public List<String> getRoutingKeys() {
            return this.routingKeys;
        }
    }

    public RabbitMQTaskLaunchConsumer() throws AiravataException {
        try {
            this.url = ServerSettings.getSetting("rabbitmq.broker.url");
            this.durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
            this.taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
            this.prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64))).intValue();
            createConnection();
        } catch (ApplicationSettingsException e) {
            log.error("Failed to get read the required properties from airavata to initialize rabbitmq", (Throwable) e);
            throw new AiravataException("Failed to get read the required properties from airavata to initialize rabbitmq", e);
        }
    }

    public RabbitMQTaskLaunchConsumer(String str, String str2) throws AiravataException {
        this.taskLaunchExchangeName = str2;
        this.url = str;
        createConnection();
    }

    private void createConnection() throws AiravataException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.url);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setRequestedHeartbeat(5);
            connectionFactory.setConnectionTimeout(5000);
            connectionFactory.setNetworkRecoveryInterval(100);
            this.connection = connectionFactory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer.1
                @Override // com.rabbitmq.client.ShutdownListener
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                    RabbitMQTaskLaunchConsumer.log.error("**************** ######## RabbitMQ connection shutting down ********** #######");
                }
            });
            log.info("connected to rabbitmq: " + this.connection + " for " + this.taskLaunchExchangeName);
            this.channel = this.connection.createChannel();
            this.channel.basicRecover(true);
            this.channel.basicQos(this.prefetchCount);
            this.channel.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer.2
                @Override // com.rabbitmq.client.ShutdownListener
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                    RabbitMQTaskLaunchConsumer.log.error("************ ######### RabbitMQ channel shutting down ********** #######", (Throwable) shutdownSignalException);
                }
            });
        } catch (Exception e) {
            String str = "could not open channel for exchange " + this.taskLaunchExchangeName;
            log.error(str);
            throw new AiravataException(str, e);
        }
    }

    public void reconnect() throws AiravataException {
        if (this.messageHandler != null) {
            try {
                createConnection();
                listen(this.messageHandler);
            } catch (AiravataException e) {
                String str = "could not open channel for exchange " + this.taskLaunchExchangeName;
                log.error(str);
                throw new AiravataException(str, e);
            }
        }
    }

    public String listen(final MessageHandler messageHandler) throws AiravataException {
        try {
            this.messageHandler = messageHandler;
            Map<String, Object> properties = messageHandler.getProperties();
            Object obj = properties.get(MessagingConstants.RABBIT_ROUTING_KEY);
            if (obj == null) {
                throw new IllegalArgumentException("The routing key must be present");
            }
            ArrayList arrayList = new ArrayList();
            if (obj instanceof List) {
                Iterator it = ((List) obj).iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().toString());
                }
            } else if (obj instanceof String) {
                arrayList.add((String) obj);
            }
            String str = (String) properties.get(MessagingConstants.RABBIT_QUEUE);
            String str2 = (String) properties.get(MessagingConstants.RABBIT_CONSUMER_TAG);
            if (str == null) {
                if (!this.channel.isOpen()) {
                    this.channel = this.connection.createChannel();
                    this.channel.basicQos(this.prefetchCount);
                }
                str = this.channel.queueDeclare().getQueue();
            } else {
                this.channel.queueDeclare(str, this.durableQueue, false, false, null);
            }
            final String id = getId(arrayList, str);
            if (this.queueDetailsMap.containsKey(id)) {
                throw new IllegalStateException("This subscriber is already defined for this Consumer, cannot define the same subscriber twice");
            }
            if (str2 == null) {
                str2 = WSConstants.DEFAULT_ATTRIBUTE;
            }
            this.channel.basicConsume(str, false, str2, (Consumer) new QueueingConsumer(this.channel) { // from class: org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer.3
                /* JADX WARN: Multi-variable type inference failed */
                @Override // com.rabbitmq.client.QueueingConsumer, com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
                public void handleDelivery(String str3, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    Message message = new Message();
                    try {
                        ThriftUtils.createThriftFromBytes(bArr, message);
                        TaskTerminateEvent taskTerminateEvent = null;
                        String str4 = null;
                        long deliveryTag = envelope.getDeliveryTag();
                        if (message.getMessageType().equals(MessageType.LAUNCHTASK)) {
                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
                            RabbitMQTaskLaunchConsumer.log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  for experimentId: " + taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
                            taskTerminateEvent = taskSubmitEvent;
                            str4 = taskSubmitEvent.getGatewayId();
                        } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) {
                            TaskTerminateEvent taskTerminateEvent2 = new TaskTerminateEvent();
                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent2);
                            RabbitMQTaskLaunchConsumer.log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType() + "'  for experimentId: " + taskTerminateEvent2.getExperimentId() + "and taskId: " + taskTerminateEvent2.getTaskId());
                            taskTerminateEvent = taskTerminateEvent2;
                            str4 = taskTerminateEvent2.getGatewayId();
                        }
                        System.out.println("*deliveryTag:" + deliveryTag);
                        MessageContext messageContext = new MessageContext(taskTerminateEvent, message.getMessageType(), message.getMessageId(), str4, deliveryTag);
                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                        messageHandler.onMessage(messageContext);
                    } catch (TException e) {
                        RabbitMQTaskLaunchConsumer.log.warn("Failed to de-serialize the thrift message, from routing keys and queueName " + id, (Throwable) e);
                    }
                }

                @Override // com.rabbitmq.client.QueueingConsumer, com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
                public void handleCancel(String str3) throws IOException {
                    super.handleCancel(str3);
                    RabbitMQTaskLaunchConsumer.log.info("Consumer cancelled : " + str3);
                }
            });
            this.queueDetailsMap.put(id, new QueueDetails(str, arrayList));
            return id;
        } catch (Exception e) {
            String str3 = "could not open channel for exchange " + this.taskLaunchExchangeName;
            log.error(str3);
            throw new AiravataException(str3, e);
        }
    }

    public void stopListen(String str) throws AiravataException {
        QueueDetails queueDetails = this.queueDetailsMap.get(str);
        if (queueDetails != null) {
            try {
                Iterator<String> it = queueDetails.getRoutingKeys().iterator();
                while (it.hasNext()) {
                    this.channel.queueUnbind(queueDetails.getQueueName(), this.taskLaunchExchangeName, it.next());
                }
            } catch (IOException e) {
                log.debug("could not un-bind queue: " + queueDetails.getQueueName() + " for exchange " + this.taskLaunchExchangeName);
            }
        }
    }

    private String getId(List<String> list, String str) {
        String str2 = "";
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            str2 = str2 + "_" + it.next();
        }
        return str2 + "_" + str;
    }

    public void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e) {
            }
        }
    }

    public boolean isOpen() {
        if (this.connection != null) {
            return this.connection.isOpen();
        }
        return false;
    }

    public void sendAck(long j) {
        try {
            this.channel.basicAck(j, false);
        } catch (IOException e) {
            logger.error(e.getMessage(), (Throwable) e);
        }
    }
}
