package org.apache.falcon.rerun.queue;

import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.falcon.FalconException;
import org.apache.falcon.messaging.util.MessagingUtil;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.event.RerunEventFactory;

/* loaded from: input_file:org/apache/falcon/rerun/queue/ActiveMQueue.class */
public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> {
    private ActiveMQConnection connection;
    private String brokerUrl;
    private String destinationName;
    private Destination destination;
    private MessageProducer producer;
    private MessageConsumer consumer;

    public ActiveMQueue(String str, String str2) {
        this.brokerUrl = str;
        this.destinationName = str2;
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public boolean offer(T t) throws FalconException {
        try {
            TextMessage createTextMessage = getSession().createTextMessage(t.toString());
            createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", t.getDelay(TimeUnit.MILLISECONDS));
            createTextMessage.setStringProperty("TYPE", t.getType().name());
            this.producer.send(createTextMessage);
            LOG.debug("Enqueued Message: {} with delay {} milli sec", t.toString(), Long.valueOf(t.getDelay(TimeUnit.MILLISECONDS)));
            return true;
        } catch (Exception e) {
            throw new FalconException("Unable to offer event:" + t + " to ActiveMQ", e);
        }
    }

    private Session getSession() throws Exception {
        if (this.connection == null) {
            init();
        }
        return this.connection.createSession(false, 1);
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public T take() throws FalconException {
        try {
            TextMessage receive = this.consumer.receive();
            T t = (T) new RerunEventFactory().getRerunEvent(receive.getStringProperty("TYPE"), receive.getText());
            LOG.debug("Dequeued Message: {}", t.toString());
            return t;
        } catch (Exception e) {
            throw new FalconException("Error getting the message from ActiveMQ: ", e);
        }
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public void populateQueue(List<T> list) {
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public void init() {
        try {
            createAndStartConnection("", "", this.brokerUrl);
            Session createSession = this.connection.createSession(false, 1);
            this.destination = createSession.createQueue(this.destinationName);
            this.producer = createSession.createProducer(this.destination);
            this.consumer = createSession.createConsumer(this.destination);
            LOG.info("Initialized Queue on ActiveMQ: {}", this.destinationName);
        } catch (Exception e) {
            throw new RuntimeException("Error starting ActiveMQ connection for delayed queue", e);
        }
    }

    private void createAndStartConnection(String str, String str2, String str3) throws JMSException {
        this.connection = new ActiveMQConnectionFactory(str, str2, str3).createConnection();
        this.connection.start();
        LOG.info("Connected successfully to {}", str3);
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public void reconnect() throws FalconException {
        close();
        init();
    }

    @Override // org.apache.falcon.rerun.queue.DelayedQueue
    public void close() {
        LOG.info("Closing queue for broker={}, destination{}", this.brokerUrl, this.destinationName);
        this.destination = null;
        MessagingUtil.closeQuietly(this.producer);
        MessagingUtil.closeQuietly(this.consumer);
        MessagingUtil.closeQuietly(this.connection);
    }
}
