/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.messaging.core.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQProducer {
    public static final int DEFAULT_PRE_FETCH = 64;
    private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
    private Connection connection;
    private Channel channel;
    private QueueingConsumer consumer;
    private String consumerTag;
    private String exchangeName;
    private int prefetchCount = 64;
    private boolean isReQueueOnFail = false;
    private String url;

    public RabbitMQProducer(String url, String exchangeName) {
        this.exchangeName = exchangeName;
        this.url = url;
    }

    public void setPrefetchCount(int prefetchCount) {
        this.prefetchCount = prefetchCount;
    }

    public void setReQueueOnFail(boolean isReQueueOnFail) {
        this.isReQueueOnFail = isReQueueOnFail;
    }

    private void reset() {
        this.consumerTag = null;
    }

    private void reInitIfNecessary() throws Exception {
        if (this.consumerTag == null || this.consumer == null) {
            this.close();
            this.open();
        }
    }

    public void close() {
        log.info("Closing channel to exchange {}", (Object)this.exchangeName);
        try {
            if (this.channel != null && this.channel.isOpen()) {
                if (this.consumerTag != null) {
                    this.channel.basicCancel(this.consumerTag);
                }
                this.channel.close();
            }
        }
        catch (Exception e) {
            log.debug("error closing channel and/or cancelling consumer", (Throwable)e);
        }
        try {
            log.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        }
        catch (Exception e) {
            log.debug("error closing connection", (Throwable)e);
        }
        this.consumer = null;
        this.consumerTag = null;
        this.channel = null;
        this.connection = null;
    }

    public void open() throws Exception {
        try {
            this.connection = this.createConnection();
            this.channel = this.connection.createChannel();
            if (this.prefetchCount > 0) {
                log.info("setting basic.qos / prefetch count to " + this.prefetchCount + " for " + this.exchangeName);
                this.channel.basicQos(this.prefetchCount);
            }
            this.channel.exchangeDeclare(this.exchangeName, "topic", false);
        }
        catch (Exception e) {
            this.reset();
            String msg = "could not open channel for exchange " + this.exchangeName;
            log.error(msg);
            throw new Exception(msg, e);
        }
    }

    public void send(byte[] message, String routingKey) throws Exception {
        try {
            this.channel.basicPublish(this.exchangeName, routingKey, null, message);
        }
        catch (IOException e) {
            String msg = "Failed to publish message to exchange: " + this.exchangeName;
            log.error(msg, (Throwable)e);
            throw new Exception(msg, e);
        }
    }

    private Connection createConnection() throws IOException {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri(this.url);
            Connection connection = connectionFactory.newConnection();
            connection.addShutdownListener(new ShutdownListener(){

                public void shutdownCompleted(ShutdownSignalException cause) {
                }
            });
            log.info("connected to rabbitmq: " + connection + " for " + this.exchangeName);
            return connection;
        }
        catch (Exception e) {
            log.info("connection failed to rabbitmq: " + this.connection + " for " + this.exchangeName);
            return null;
        }
    }

    public void ackMessage(Long msgId) throws Exception {
        try {
            this.channel.basicAck(msgId.longValue(), false);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            String msg = "shutdown signal received while attempting to ack message";
            log.error(msg, (Throwable)sse);
            throw new Exception(msg, sse);
        }
        catch (Exception e) {
            String s = "could not ack for msgId: " + msgId;
            log.error(s, (Throwable)e);
            throw new Exception(s, e);
        }
    }

    public void failMessage(Long msgId) throws Exception {
        if (this.isReQueueOnFail) {
            this.failWithRedelivery(msgId);
        } else {
            this.deadLetter(msgId);
        }
    }

    public void failWithRedelivery(Long msgId) throws Exception {
        try {
            this.channel.basicReject(msgId.longValue(), true);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            String msg = "shutdown signal received while attempting to fail with redelivery";
            log.error(msg, (Throwable)sse);
            throw new Exception(msg, sse);
        }
        catch (Exception e) {
            String msg = "could not fail with redelivery for msgId: " + msgId;
            log.error(msg, (Throwable)e);
            throw new Exception(msg, e);
        }
    }

    public void deadLetter(Long msgId) throws Exception {
        try {
            this.channel.basicReject(msgId.longValue(), false);
        }
        catch (ShutdownSignalException sse) {
            this.reset();
            String msg = "shutdown signal received while attempting to fail with no redelivery";
            log.error(msg, (Throwable)sse);
            throw new Exception(msg, sse);
        }
        catch (Exception e) {
            String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId;
            log.error(msg, (Throwable)e);
            throw new Exception(msg, e);
        }
    }
}

