package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.messaging.core.RabbitMQProperties;
import org.apache.airavata.messaging.core.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.class */
public class RabbitMQSubscriber implements Subscriber {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQSubscriber.class);
    private Connection connection;
    private Channel channel;
    private Map<String, QueueDetail> queueDetailMap = new HashMap();
    private RabbitMQProperties properties;

    /* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQSubscriber$QueueDetail.class */
    private class QueueDetail {
        String queueName;
        List<String> routingKeys;

        private QueueDetail(String str, List<String> list) {
            this.queueName = str;
            this.routingKeys = list;
        }

        public String getQueueName() {
            return this.queueName;
        }

        List<String> getRoutingKeys() {
            return this.routingKeys;
        }
    }

    public RabbitMQSubscriber(RabbitMQProperties rabbitMQProperties) throws AiravataException {
        this.properties = rabbitMQProperties;
        createConnection();
    }

    private void createConnection() throws AiravataException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.properties.getBrokerUrl());
            connectionFactory.setAutomaticRecoveryEnabled(this.properties.isAutoRecoveryEnable());
            this.connection = connectionFactory.newConnection();
            addShutdownListener();
            log.info("connected to rabbitmq: " + this.connection + " for " + this.properties.getExchangeName());
            this.channel = this.connection.createChannel();
            this.channel.basicQos(this.properties.getPrefetchCount());
            this.channel.exchangeDeclare(this.properties.getExchangeName(), this.properties.getExchangeType(), true);
        } catch (Exception e) {
            String str = "could not open channel for exchange " + this.properties.getExchangeName();
            log.error(str);
            throw new AiravataException(str, e);
        }
    }

    @Override // org.apache.airavata.messaging.core.Subscriber
    public String listen(BiFunction<Connection, Channel, Consumer> biFunction, String str, List<String> list) throws AiravataException {
        try {
            if (!this.channel.isOpen()) {
                this.channel = this.connection.createChannel();
                this.channel.exchangeDeclare(this.properties.getExchangeName(), this.properties.getExchangeType(), false);
            }
            if (str == null) {
                str = this.channel.queueDeclare().getQueue();
            } else {
                this.channel.queueDeclare(str, true, false, false, null);
            }
            String id = getId(list, str);
            if (this.queueDetailMap.containsKey(id)) {
                throw new IllegalStateException("This subscriber is already defined for this Subscriber, cannot define the same subscriber twice");
            }
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                this.channel.queueBind(str, this.properties.getExchangeName(), it.next());
            }
            this.channel.basicConsume(str, this.properties.isAutoAck(), this.properties.getConsumerTag(), biFunction.apply(this.connection, this.channel));
            this.queueDetailMap.put(id, new QueueDetail(str, list));
            return id;
        } catch (IOException e) {
            String str2 = "could not open channel for exchange " + this.properties.getExchangeName();
            log.error(str2);
            throw new AiravataException(str2, e);
        }
    }

    @Override // org.apache.airavata.messaging.core.Subscriber
    public void stopListen(String str) throws AiravataException {
        QueueDetail queueDetail = this.queueDetailMap.get(str);
        if (queueDetail != null) {
            try {
                Iterator<String> it = queueDetail.getRoutingKeys().iterator();
                while (it.hasNext()) {
                    this.channel.queueUnbind(queueDetail.getQueueName(), this.properties.getExchangeName(), it.next());
                }
                this.channel.queueDelete(queueDetail.getQueueName(), true, true);
            } catch (IOException e) {
                log.debug("could not un-bind queue: " + queueDetail.getQueueName() + " for exchange " + this.properties.getExchangeName());
            }
        }
    }

    @Override // org.apache.airavata.messaging.core.Subscriber
    public void sendAck(long j) {
        try {
            if (this.channel.isOpen()) {
                this.channel.basicAck(j, false);
            } else {
                this.channel = this.connection.createChannel();
                this.channel.basicQos(this.properties.getPrefetchCount());
                this.channel.basicAck(j, false);
            }
        } catch (IOException e) {
            log.error(e.getMessage(), (Throwable) e);
        }
    }

    private void addShutdownListener() {
        this.connection.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQSubscriber.1
            @Override // com.rabbitmq.client.ShutdownListener
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                RabbitMQSubscriber.log.error("RabbitMQ connection " + RabbitMQSubscriber.this.connection + " for " + RabbitMQSubscriber.this.properties.getExchangeName() + " has been shut down", (Throwable) shutdownSignalException);
            }
        });
    }

    private String getId(List<String> list, String str) {
        String str2 = "";
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            str2 = str2 + "_" + it.next();
        }
        return str2 + "_" + str;
    }

    public void close() {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e) {
            }
        }
    }
}
