package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Map;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.model.messaging.event.Message;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.class */
public class RabbitMQProcessConsumer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQProcessConsumer.class);
    private String url;
    private Connection connection;
    private Channel channel;
    private int prefetchCount;

    public RabbitMQProcessConsumer() throws AiravataException {
        try {
            this.url = ServerSettings.getSetting("rabbitmq.broker.url");
            this.prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64))).intValue();
            createConnection();
        } catch (ApplicationSettingsException e) {
            log.error("Failed to get read the required properties from airavata to initialize rabbitmq", (Throwable) e);
            throw new AiravataException("Failed to get read the required properties from airavata to initialize rabbitmq", e);
        }
    }

    private void createConnection() throws AiravataException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.url);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            this.connection = connectionFactory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer.1
                @Override // com.rabbitmq.client.ShutdownListener
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                }
            });
            log.info("connected to rabbitmq: " + this.connection + " for default");
            this.channel = this.connection.createChannel();
            this.channel.basicQos(this.prefetchCount);
        } catch (Exception e) {
            log.error("could not open channel for exchange default");
            throw new AiravataException("could not open channel for exchange default", e);
        }
    }

    public String listen(final MessageHandler messageHandler) throws AiravataException {
        try {
            Map<String, Object> properties = messageHandler.getProperties();
            if (properties.get(MessagingConstants.RABBIT_ROUTING_KEY) == null) {
                throw new IllegalArgumentException("The routing key must be present");
            }
            String str = (String) properties.get(MessagingConstants.RABBIT_QUEUE);
            String str2 = (String) properties.get(MessagingConstants.RABBIT_CONSUMER_TAG);
            if (str == null) {
                if (!this.channel.isOpen()) {
                    this.channel = this.connection.createChannel();
                    this.channel.basicQos(this.prefetchCount);
                }
                str = this.channel.queueDeclare().getQueue();
            } else {
                this.channel.queueDeclare(str, true, false, false, null);
            }
            if (str2 == null) {
            }
            final String str3 = str;
            this.channel.basicConsume(str, true, new QueueingConsumer(this.channel) { // from class: org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer.2
                @Override // com.rabbitmq.client.QueueingConsumer, com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
                public void handleDelivery(String str4, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                    Message message = new Message();
                    try {
                        ThriftUtils.createThriftFromBytes(bArr, message);
                        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
                        ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
                        RabbitMQProcessConsumer.log.debug("Message received with message id : " + message.getMessageId() + " with task id : " + processSubmitEvent.getTaskId());
                        MessageContext messageContext = new MessageContext(processSubmitEvent, message.getMessageType(), message.getMessageId(), null);
                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                        messageHandler.onMessage(messageContext);
                    } catch (TException e) {
                        RabbitMQProcessConsumer.log.warn("Failed to de-serialize the thrift message, from routing keys and queueName " + str3, (Throwable) e);
                    }
                }
            });
            return "";
        } catch (Exception e) {
            log.error("could not open channel for exchange default");
            throw new AiravataException("could not open channel for exchange default", e);
        }
    }

    public void stopListen(String str, String str2) throws AiravataException {
        try {
            this.channel.queueUnbind(str, str2, null);
        } catch (IOException e) {
            log.debug("could not un-bind queue: " + str + " for exchange " + str2);
        }
    }
}
