package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/messaging/core/impl/RabbitMQProducer.class */
public class RabbitMQProducer {
    public static final int DEFAULT_PRE_FETCH = 64;
    private static Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQProducer.class);
    private Connection connection;
    private Channel channel;
    private QueueingConsumer consumer;
    private String consumerTag;
    private String exchangeName;
    private int prefetchCount;
    private boolean isReQueueOnFail;
    private String url;
    private String getExchangeType;

    public RabbitMQProducer(String str, String str2, String str3) {
        this.prefetchCount = 64;
        this.isReQueueOnFail = false;
        this.getExchangeType = "topic";
        this.exchangeName = str2;
        this.url = str;
        this.getExchangeType = str3;
    }

    public RabbitMQProducer(String str, String str2) {
        this.prefetchCount = 64;
        this.isReQueueOnFail = false;
        this.getExchangeType = "topic";
        this.exchangeName = str2;
        this.url = str;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public void setReQueueOnFail(boolean z) {
        this.isReQueueOnFail = z;
    }

    private void reset() {
        this.consumerTag = null;
    }

    private void reInitIfNecessary() throws Exception {
        if (this.consumerTag == null || this.consumer == null) {
            close();
            open();
        }
    }

    public void close() {
        log.info("Closing channel to exchange {}", this.exchangeName);
        try {
            if (this.channel != null && this.channel.isOpen()) {
                if (this.consumerTag != null) {
                    this.channel.basicCancel(this.consumerTag);
                }
                this.channel.close();
            }
        } catch (Exception e) {
            log.debug("error closing channel and/or cancelling consumer", (Throwable) e);
        }
        try {
            log.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        } catch (Exception e2) {
            log.debug("error closing connection", (Throwable) e2);
        }
        this.consumer = null;
        this.consumerTag = null;
        this.channel = null;
        this.connection = null;
    }

    public void open() throws Exception {
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            if (this.prefetchCount > 0) {
                log.info("setting basic.qos / prefetch count to " + this.prefetchCount + " for " + this.exchangeName);
                this.channel.basicQos(this.prefetchCount);
            }
            if (this.exchangeName != null) {
                this.channel.exchangeDeclare(this.exchangeName, this.getExchangeType, false);
            }
        } catch (Exception e) {
            reset();
            String str = "could not open channel for exchange " + this.exchangeName;
            log.error(str);
            throw new Exception(str, e);
        }
    }

    public void send(byte[] bArr, String str) throws Exception {
        try {
            this.channel.basicPublish(this.exchangeName, str, null, bArr);
        } catch (IOException e) {
            String str2 = "Failed to publish message to exchange: " + this.exchangeName;
            log.error(str2, (Throwable) e);
            throw new Exception(str2, e);
        }
    }

    public void sendToWorkerQueue(byte[] bArr, String str) throws Exception {
        try {
            this.channel.basicPublish("", str, MessageProperties.PERSISTENT_TEXT_PLAIN, bArr);
        } catch (IOException e) {
            String str2 = "Failed to publish message to exchange: " + this.exchangeName;
            log.error(str2, (Throwable) e);
            throw new Exception(str2, e);
        }
    }

    private Connection createConnection() throws IOException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.url);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            Connection newConnection = connectionFactory.newConnection();
            newConnection.addShutdownListener(new ShutdownListener() { // from class: org.apache.airavata.messaging.core.impl.RabbitMQProducer.1
                @Override // com.rabbitmq.client.ShutdownListener
                public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                }
            });
            log.info("connected to rabbitmq: " + newConnection + " for " + this.exchangeName);
            return newConnection;
        } catch (Exception e) {
            log.info("connection failed to rabbitmq: " + this.connection + " for " + this.exchangeName);
            return null;
        }
    }

    public void ackMessage(Long l) throws Exception {
        try {
            this.channel.basicAck(l.longValue(), false);
        } catch (ShutdownSignalException e) {
            reset();
            log.error("shutdown signal received while attempting to ack message", (Throwable) e);
            throw new Exception("shutdown signal received while attempting to ack message", e);
        } catch (Exception e2) {
            String str = "could not ack for msgId: " + l;
            log.error(str, (Throwable) e2);
            throw new Exception(str, e2);
        }
    }

    public void failMessage(Long l) throws Exception {
        if (this.isReQueueOnFail) {
            failWithRedelivery(l);
        } else {
            deadLetter(l);
        }
    }

    public void failWithRedelivery(Long l) throws Exception {
        try {
            this.channel.basicReject(l.longValue(), true);
        } catch (ShutdownSignalException e) {
            reset();
            log.error("shutdown signal received while attempting to fail with redelivery", (Throwable) e);
            throw new Exception("shutdown signal received while attempting to fail with redelivery", e);
        } catch (Exception e2) {
            String str = "could not fail with redelivery for msgId: " + l;
            log.error(str, (Throwable) e2);
            throw new Exception(str, e2);
        }
    }

    public void deadLetter(Long l) throws Exception {
        try {
            this.channel.basicReject(l.longValue(), false);
        } catch (ShutdownSignalException e) {
            reset();
            log.error("shutdown signal received while attempting to fail with no redelivery", (Throwable) e);
            throw new Exception("shutdown signal received while attempting to fail with no redelivery", e);
        } catch (Exception e2) {
            String str = "could not fail with dead-lettering (when configured) for msgId: " + l;
            log.error(str, (Throwable) e2);
            throw new Exception(str, e2);
        }
    }
}
