package gov.nasa.pds.crawler.mq.amq;

import com.google.gson.Gson;
import gov.nasa.pds.crawler.mq.msg.DirectoryMessageBuilder;
import gov.nasa.pds.registry.common.mq.msg.JobMessage;
import gov.nasa.pds.registry.common.mq.msg.MQConstants;
import gov.nasa.pds.registry.common.util.ExceptionUtils;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:BOOT-INF/classes/gov/nasa/pds/crawler/mq/amq/JobConsumerActiveMQ.class */
public class JobConsumerActiveMQ implements Runnable {
    private Thread thread;
    private Session session;
    private Destination jobQueue;
    private Destination dirQueue;
    private MessageConsumer jobConsumer;
    private MessageProducer dirProducer;
    private volatile boolean stopRequested = false;
    private Logger log = LogManager.getLogger(getClass());
    private Gson gson = new Gson();

    public JobConsumerActiveMQ(Connection connection) throws Exception {
        this.session = connection.createSession(false, 2);
        this.jobQueue = this.session.createQueue(MQConstants.MQ_JOBS);
        this.dirQueue = this.session.createQueue(MQConstants.MQ_DIRS);
        this.jobConsumer = this.session.createConsumer(this.jobQueue);
        this.dirProducer = this.session.createProducer(this.dirQueue);
        this.dirProducer.setDeliveryMode(2);
    }

    public void start() {
        this.thread = new Thread(this);
        this.thread.start();
    }

    public void stop() {
        this.stopRequested = true;
    }

    public void join() throws InterruptedException {
        this.thread.join();
    }

    @Override // java.lang.Runnable
    public void run() {
        do {
            Message message = null;
            try {
                message = this.jobConsumer.receive(3000L);
            } catch (Exception e) {
                this.log.error(ExceptionUtils.getMessage(e));
            }
            if (message != null) {
                try {
                    processMessage(message);
                    message.acknowledge();
                } catch (Exception e2) {
                    this.log.error(ExceptionUtils.getMessage(e2));
                }
            }
        } while (!this.stopRequested);
        close(this.session);
    }

    private void processMessage(Message message) throws JMSException {
        if (!(message instanceof TextMessage)) {
            this.log.warn("Invalid message. ID = " + message.getJMSMessageID());
            return;
        }
        try {
            JobMessage jobMessage = (JobMessage) this.gson.fromJson(((TextMessage) message).getText(), JobMessage.class);
            this.log.info("Processing job " + jobMessage.jobId);
            if (jobMessage.dirs != null) {
                Iterator<String> it = jobMessage.dirs.iterator();
                while (it.hasNext()) {
                    this.dirProducer.send(this.session.createTextMessage(this.gson.toJson(DirectoryMessageBuilder.createDirectoryMessage(jobMessage, it.next()))));
                }
            }
            if (jobMessage.manifests != null) {
                Iterator<String> it2 = jobMessage.manifests.iterator();
                while (it2.hasNext()) {
                    this.dirProducer.send(this.session.createTextMessage(this.gson.toJson(DirectoryMessageBuilder.createManifestMessage(jobMessage, it2.next()))));
                }
            }
        } catch (Exception e) {
            this.log.error("Could not parse message. ID = " + message.getJMSMessageID());
        }
    }

    private void close(Session session) {
        try {
            session.close();
        } catch (Exception e) {
        }
    }
}
