package org.apache.airavata.gfac.monitor.impl.push.amqp;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.core.PushMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.class */
public class AMQPMonitor extends PushMonitor {
    private static final Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
    private Map<String, Channel> availableChannels;
    private MonitorPublisher publisher;
    private MonitorPublisher localPublisher;
    private BlockingQueue<MonitorID> runningQueue;
    private BlockingQueue<MonitorID> finishQueue;
    private String connectionName;
    private String proxyPath;
    private List<String> amqpHosts;
    private boolean startRegister;

    public AMQPMonitor() {
    }

    public AMQPMonitor(MonitorPublisher monitorPublisher, BlockingQueue<MonitorID> blockingQueue, BlockingQueue<MonitorID> blockingQueue2, String str, String str2, List<String> list) {
        this.publisher = monitorPublisher;
        this.runningQueue = blockingQueue;
        this.finishQueue = blockingQueue2;
        this.availableChannels = new HashMap();
        this.connectionName = str2;
        this.proxyPath = str;
        this.amqpHosts = list;
        this.localPublisher = new MonitorPublisher(new EventBus());
        this.localPublisher.registerListener(this);
    }

    public void initialize(String str, String str2, List<String> list) {
        this.availableChannels = new HashMap();
        this.connectionName = str2;
        this.proxyPath = str;
        this.amqpHosts = list;
        this.localPublisher = new MonitorPublisher(new EventBus());
        this.localPublisher.registerListener(this);
    }

    @Override // org.apache.airavata.gfac.monitor.core.PushMonitor
    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
        String hostAddress = monitorID.getHost().getType().getHostAddress();
        String channelID = CommonUtils.getChannelID(monitorID);
        if (this.availableChannels.get(channelID) != null) {
            return true;
        }
        try {
            Channel createChannel = AMQPConnectionUtil.connect(this.amqpHosts, this.connectionName, this.proxyPath).createChannel();
            this.availableChannels.put(channelID, createChannel);
            String queue = createChannel.queueDeclare().getQueue();
            createChannel.basicConsume(queue, true, new BasicConsumer(new JSONMessageParser(), this.localPublisher));
            String routingKey = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
            createChannel.queueBind(queue, "glue2.computing_activity", routingKey);
            logger.info("Using filtering string to monitor: " + routingKey);
            return true;
        } catch (IOException e) {
            logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
            return true;
        }
    }

    public void run() {
        this.startRegister = true;
        while (true) {
            if (!this.startRegister && ServerSettings.isStopAllThreads()) {
                break;
            }
            try {
                registerListener(this.runningQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (AiravataMonitorException e2) {
                e2.printStackTrace();
            } catch (Exception e3) {
                e3.printStackTrace();
            }
        }
        Iterator<String> it = this.availableChannels.keySet().iterator();
        while (it.hasNext()) {
            try {
                this.availableChannels.get(it.next()).close();
            } catch (IOException e4) {
                e4.printStackTrace();
            }
        }
    }

    @Override // org.apache.airavata.gfac.monitor.core.PushMonitor
    @Subscribe
    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
        r14 = null;
        for (MonitorID monitorID2 : this.finishQueue) {
            if (monitorID2.getJobID().endsWith(monitorID.getJobID())) {
                break;
            }
        }
        if (monitorID2 == null) {
            logger.error("Job has removed from the queue, old obsolete message recieved");
            return false;
        }
        String channelID = CommonUtils.getChannelID(monitorID2);
        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
            this.finishQueue.remove(monitorID2);
            if (CommonUtils.isTheLastJobInQueue(this.finishQueue, monitorID2)) {
                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it, incase new job created we do subscribe again");
                Channel channel = this.availableChannels.get(channelID);
                if (channel == null) {
                    logger.error("Already Unregistered the listener");
                    throw new AiravataMonitorException("Already Unregistered the listener");
                }
                try {
                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID2));
                    channel.close();
                    channel.getConnection().close();
                    this.availableChannels.remove(channelID);
                } catch (IOException e) {
                    logger.error("Error unregistering the listener");
                    throw new AiravataMonitorException("Error unregistering the listener");
                }
            }
        }
        monitorID2.setStatus(monitorID.getStatus());
        this.publisher.publish(new JobStatusChangeRequest(monitorID2, new JobIdentity(monitorID2.getExperimentID(), monitorID2.getWorkflowNodeID(), monitorID2.getTaskID(), monitorID2.getJobID()), monitorID2.getStatus()));
        return true;
    }

    @Override // org.apache.airavata.gfac.monitor.core.PushMonitor
    public boolean stopRegister() throws AiravataMonitorException {
        return false;
    }

    public Map<String, Channel> getAvailableChannels() {
        return this.availableChannels;
    }

    public void setAvailableChannels(Map<String, Channel> map) {
        this.availableChannels = map;
    }

    @Override // org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor
    public MonitorPublisher getPublisher() {
        return this.publisher;
    }

    @Override // org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor
    public void setPublisher(MonitorPublisher monitorPublisher) {
        this.publisher = monitorPublisher;
    }

    public BlockingQueue<MonitorID> getRunningQueue() {
        return this.runningQueue;
    }

    public void setRunningQueue(BlockingQueue<MonitorID> blockingQueue) {
        this.runningQueue = blockingQueue;
    }

    public BlockingQueue<MonitorID> getFinishQueue() {
        return this.finishQueue;
    }

    public void setFinishQueue(BlockingQueue<MonitorID> blockingQueue) {
        this.finishQueue = blockingQueue;
    }

    public String getProxyPath() {
        return this.proxyPath;
    }

    public void setProxyPath(String str) {
        this.proxyPath = str;
    }

    public List<String> getAmqpHosts() {
        return this.amqpHosts;
    }

    public void setAmqpHosts(List<String> list) {
        this.amqpHosts = list;
    }

    public boolean isStartRegister() {
        return this.startRegister;
    }

    public void setStartRegister(boolean z) {
        this.startRegister = z;
    }
}
