package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.function.Function;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.RabbitMQProperties;
import org.apache.airavata.model.messaging.event.Message;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQPublisher.class */
public class RabbitMQPublisher implements Publisher {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQPublisher.class);
    private final RabbitMQProperties properties;
    private final Function<MessageContext, String> routingKeySupplier;
    private Connection connection;
    private Channel channel;

    public RabbitMQPublisher(RabbitMQProperties rabbitMQProperties, Function<MessageContext, String> function) throws AiravataException {
        this.properties = rabbitMQProperties;
        this.routingKeySupplier = function;
        connect();
    }

    public RabbitMQPublisher(RabbitMQProperties rabbitMQProperties) throws AiravataException {
        this.properties = rabbitMQProperties;
        this.routingKeySupplier = null;
        connect();
    }

    private void connect() throws AiravataException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.properties.getBrokerUrl());
            connectionFactory.setAutomaticRecoveryEnabled(this.properties.isAutoRecoveryEnable());
            this.connection = connectionFactory.newConnection();
            this.connection.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQPublisher.1
                @Override // com.rabbitmq.client.ShutdownListener
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                }
            });
            log.info("connected to rabbitmq: " + this.connection + " for " + this.properties.getExchangeName());
            this.channel = this.connection.createChannel();
            if (this.properties.getPrefetchCount() > 0) {
                this.channel.basicQos(this.properties.getPrefetchCount());
            }
            if (this.properties.getExchangeName() != null) {
                this.channel.exchangeDeclare(this.properties.getExchangeName(), this.properties.getExchangeType(), true);
            }
        } catch (Exception e) {
            String str = "RabbitMQ connection issue for exchange : " + this.properties.getExchangeName();
            log.error(str);
            throw new AiravataException(str, e);
        }
    }

    @Override // org.apache.airavata.messaging.core.Publisher
    public void publish(MessageContext messageContext) throws AiravataException {
        try {
            byte[] serializeThriftObject = ThriftUtils.serializeThriftObject(messageContext.getEvent());
            Message message = new Message();
            message.setEvent(serializeThriftObject);
            message.setMessageId(messageContext.getMessageId());
            message.setMessageType(messageContext.getType());
            message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
            send(ThriftUtils.serializeThriftObject(message), this.routingKeySupplier.apply(messageContext));
        } catch (TException e) {
            log.error("Error while deserializing the object", (Throwable) e);
            throw new AiravataException("Error while deserializing the object", e);
        } catch (Exception e2) {
            log.error("Error while sending to rabbitmq", (Throwable) e2);
            throw new AiravataException("Error while sending to rabbitmq", e2);
        }
    }

    @Override // org.apache.airavata.messaging.core.Publisher
    public void publish(MessageContext messageContext, String str) throws AiravataException {
        try {
            byte[] serializeThriftObject = ThriftUtils.serializeThriftObject(messageContext.getEvent());
            Message message = new Message();
            message.setEvent(serializeThriftObject);
            message.setMessageId(messageContext.getMessageId());
            message.setMessageType(messageContext.getType());
            if (messageContext.getUpdatedTime() != null) {
                message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
            }
            send(ThriftUtils.serializeThriftObject(message), str);
        } catch (TException e) {
            log.error("Error while deserializing the object", (Throwable) e);
            throw new AiravataException("Error while deserializing the object", e);
        } catch (Exception e2) {
            log.error("Error while sending to rabbitmq", (Throwable) e2);
            throw new AiravataException("Error while sending to rabbitmq", e2);
        }
    }

    public void send(byte[] bArr, String str) throws Exception {
        try {
            this.channel.basicPublish(this.properties.getExchangeName(), str, MessageProperties.PERSISTENT_TEXT_PLAIN, bArr);
        } catch (IOException e) {
            String str2 = "Failed to publish message to exchange: " + this.properties.getExchangeName();
            log.error(str2, (Throwable) e);
            throw new Exception(str2, e);
        }
    }
}
