package org.apache.airavata.gfac.monitor.impl.pull.qstat;

import com.google.common.eventbus.EventBus;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.JobState;

/* loaded from: input_file:org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.class */
public class HPCPullMonitor extends PullMonitor {
    private static final AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
    public static final int FAILED_COUNT = 5;
    private BlockingQueue<UserMonitorData> queue;
    private boolean startPulling;
    private Map<String, ResourceConnection> connections;
    private MonitorPublisher publisher;
    private LinkedBlockingQueue<String> cancelJobList;
    private GFac gfac;
    private AuthenticationInfo authenticationInfo;
    private ArrayList<MonitorID> removeList;

    public HPCPullMonitor() {
        this.startPulling = false;
        this.connections = new HashMap();
        this.queue = new LinkedBlockingDeque();
        this.publisher = new MonitorPublisher(new EventBus());
        this.cancelJobList = new LinkedBlockingQueue<>();
        this.removeList = new ArrayList<>();
    }

    public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authenticationInfo) {
        this.startPulling = false;
        this.connections = new HashMap();
        this.queue = new LinkedBlockingDeque();
        this.publisher = monitorPublisher;
        this.authenticationInfo = authenticationInfo;
        this.cancelJobList = new LinkedBlockingQueue<>();
        this.removeList = new ArrayList<>();
    }

    public HPCPullMonitor(BlockingQueue<UserMonitorData> blockingQueue, MonitorPublisher monitorPublisher) {
        this.startPulling = false;
        this.queue = blockingQueue;
        this.publisher = monitorPublisher;
        this.connections = new HashMap();
        this.cancelJobList = new LinkedBlockingQueue<>();
        this.removeList = new ArrayList<>();
    }

    public void run() {
        this.startPulling = true;
        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
            try {
                synchronized (this.queue) {
                    if (this.queue.size() > 0) {
                        startPulling();
                    }
                }
                Thread.sleep(10000L);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        Iterator<String> it = this.connections.keySet().iterator();
        while (it.hasNext()) {
            try {
                this.connections.get(it.next()).getCluster().disconnect();
            } catch (SSHApiException e2) {
                logger.error("Erro while connecting to the cluster", e2);
            }
        }
    }

    @Override // org.apache.airavata.gfac.monitor.core.PullMonitor
    public boolean startPulling() throws AiravataMonitorException {
        ResourceConnection resourceConnection;
        UserMonitorData userMonitorData = null;
        JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent();
        MonitorID monitorID = null;
        try {
            userMonitorData = this.queue.take();
            ListIterator<HostMonitorData> listIterator = userMonitorData.getHostMonitorData().listIterator();
            while (listIterator.hasNext()) {
                HostMonitorData next = listIterator.next();
                if (next.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
                    String hostName = next.getComputeResourceDescription().getHostName();
                    if (!this.connections.containsKey(hostName)) {
                        resourceConnection = new ResourceConnection(next, getAuthenticationInfo());
                        this.connections.put(hostName, resourceConnection);
                    } else if (this.connections.get(hostName).isConnected()) {
                        logger.debug("We already have this connection so not going to create one");
                        resourceConnection = this.connections.get(hostName);
                    } else {
                        resourceConnection = new ResourceConnection(next, getAuthenticationInfo());
                        this.connections.put(hostName, resourceConnection);
                    }
                    List<MonitorID> monitorIDs = next.getMonitorIDs();
                    Iterator<String> it = this.cancelJobList.iterator();
                    ListIterator<MonitorID> listIterator2 = monitorIDs.listIterator();
                    while (listIterator2.hasNext()) {
                        MonitorID next2 = listIterator2.next();
                        while (true) {
                            if (it.hasNext()) {
                                String next3 = it.next();
                                if (next3.equals(next2.getExperimentID() + "+" + next2.getTaskID())) {
                                    next2.setStatus(JobState.CANCELED);
                                    this.removeList.add(next2);
                                    logger.debugId(next3, "Found a match in cancel monitor queue, hence moved to the completed job queue, experiment {}, task {} , job {}", new Object[]{next2.getExperimentID(), next2.getTaskID(), next2.getJobID()});
                                    logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", new Object[]{next2.getExperimentID(), next2.getTaskID(), next2.getJobName()});
                                    sendNotification(next2);
                                    logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
                                    Thread.sleep(10000L);
                                    GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(this.gfac, next2, this.publisher));
                                    break;
                                }
                            }
                        }
                        it = this.cancelJobList.iterator();
                    }
                    cleanup(userMonitorData);
                    List<MonitorID> monitorIDs2 = next.getMonitorIDs();
                    Map<String, JobState> jobStatuses = resourceConnection.getJobStatuses(monitorIDs2);
                    ListIterator<MonitorID> listIterator3 = monitorIDs2.listIterator();
                    while (listIterator3.hasNext()) {
                        MonitorID next4 = listIterator3.next();
                        monitorID = next4;
                        if (!JobState.CANCELED.equals(next4.getStatus()) && !JobState.COMPLETE.equals(next4.getStatus())) {
                            next4.setStatus(jobStatuses.get(next4.getJobID() + "," + next4.getJobName()));
                        } else if (JobState.COMPLETE.equals(next4.getStatus())) {
                            logger.debugId(next4.getJobID(), "Moved job {} to completed jobs map, experiment {}, task {}", new Object[]{next4.getJobID(), next4.getExperimentID(), next4.getTaskID()});
                            this.removeList.add(next4);
                            logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", new Object[]{next4.getExperimentID(), next4.getTaskID(), next4.getJobName()});
                            GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(this.gfac, next4, this.publisher));
                        }
                        next4.setStatus(jobStatuses.get(next4.getJobID() + "," + next4.getJobName()));
                        next4.setLastMonitored(new Timestamp(new Date().getTime()));
                        sendNotification(next4);
                        next4.setLastMonitored(new Timestamp(new Date().getTime()));
                    }
                    cleanup(userMonitorData);
                    ListIterator<MonitorID> listIterator4 = monitorIDs2.listIterator();
                    while (listIterator4.hasNext()) {
                        MonitorID next5 = listIterator4.next();
                        if (next5.getFailedCount() > 5) {
                            next5.setLastMonitored(new Timestamp(new Date().getTime()));
                            List list = null;
                            try {
                                list = resourceConnection.getCluster().listDirectory(next5.getJobExecutionContext().getOutputDir());
                            } catch (SSHApiException e) {
                                if (e.getMessage().contains("No such file or directory")) {
                                    logger.error("We know this  job is already attempted to run out-handlers");
                                }
                            }
                            if (list == null || list.size() <= 0 || ((String) list.get(0)).isEmpty()) {
                                next5.setFailedCount(0);
                            } else {
                                next5.setStatus(JobState.COMPLETE);
                                logger.errorId(next5.getJobID(), "Job monitoring failed {} times,  Experiment {} , task {}", new Object[]{Integer.valueOf(next5.getFailedCount()), next5.getExperimentID(), next5.getTaskID()});
                                logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", new Object[]{next5.getExperimentID(), next5.getTaskID(), next5.getJobName()});
                                sendNotification(next5);
                                this.removeList.add(next5);
                                GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(this.gfac, next5, this.publisher));
                            }
                        } else {
                            next5.setLastMonitored(new Timestamp(new Date().getTime()));
                        }
                    }
                    cleanup(userMonitorData);
                } else {
                    logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", next.getComputeResourceDescription().getHostName());
                }
            }
            ListIterator<HostMonitorData> listIterator5 = userMonitorData.getHostMonitorData().listIterator();
            while (listIterator5.hasNext()) {
                HostMonitorData next6 = listIterator5.next();
                if (next6.getMonitorIDs().size() == 0) {
                    listIterator5.remove();
                    logger.debug("Removed host {} from monitoring queue", next6.getComputeResourceDescription().getHostName());
                }
            }
            if (userMonitorData.getHostMonitorData().size() != 0) {
                this.queue.put(userMonitorData);
            }
            return true;
        } catch (InterruptedException e2) {
            if (!this.queue.contains(userMonitorData)) {
                try {
                    this.queue.put(userMonitorData);
                } catch (InterruptedException e3) {
                    e3.printStackTrace();
                }
            }
            logger.error("Error handling the job with Job ID:" + monitorID.getJobID());
            throw new AiravataMonitorException(e2);
        } catch (SSHApiException e4) {
            logger.error(e4.getMessage());
            if (e4.getMessage().contains("Unknown Job Id Error")) {
                jobStatusChangeRequestEvent.setState(JobState.UNKNOWN);
                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
                if (monitorID != null) {
                    jobIdentifier.setExperimentId(monitorID.getExperimentID());
                    jobIdentifier.setTaskId(monitorID.getTaskID());
                    jobIdentifier.setWorkflowNodeId(monitorID.getWorkflowNodeID());
                    jobIdentifier.setJobId(monitorID.getJobID());
                    jobIdentifier.setGatewayId(monitorID.getJobExecutionContext().getGatewayID());
                }
                jobStatusChangeRequestEvent.setJobIdentity(jobIdentifier);
                this.publisher.publish(jobStatusChangeRequestEvent);
            } else if (e4.getMessage().contains("illegally formed job identifier")) {
                logger.error("Wrong job ID is given so dropping the job from monitoring system");
            } else if (!this.queue.contains(userMonitorData)) {
                try {
                    this.queue.put(userMonitorData);
                } catch (InterruptedException e5) {
                    e5.printStackTrace();
                }
            }
            throw new AiravataMonitorException("Error retrieving the job status", e4);
        } catch (Exception e6) {
            try {
                this.queue.put(userMonitorData);
            } catch (InterruptedException e7) {
                e7.printStackTrace();
            }
            throw new AiravataMonitorException("Error retrieving the job status", e6);
        }
    }

    private void sendNotification(MonitorID monitorID) {
        JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent();
        jobStatusChangeRequestEvent.setJobIdentity(new JobIdentifier(monitorID.getJobID(), monitorID.getTaskID(), monitorID.getWorkflowNodeID(), monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID()));
        jobStatusChangeRequestEvent.setState(monitorID.getStatus());
        logger.debugId(jobStatusChangeRequestEvent.getJobIdentity().getJobId(), "Published job status change request, experiment {} , task {}", jobStatusChangeRequestEvent.getJobIdentity().getExperimentId(), jobStatusChangeRequestEvent.getJobIdentity().getTaskId());
        this.publisher.publish(jobStatusChangeRequestEvent);
    }

    @Override // org.apache.airavata.gfac.monitor.core.PullMonitor
    public boolean stopPulling() {
        this.startPulling = false;
        return true;
    }

    @Override // org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor
    public MonitorPublisher getPublisher() {
        return this.publisher;
    }

    @Override // org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor
    public void setPublisher(MonitorPublisher monitorPublisher) {
        this.publisher = monitorPublisher;
    }

    public BlockingQueue<UserMonitorData> getQueue() {
        return this.queue;
    }

    public void setQueue(BlockingQueue<UserMonitorData> blockingQueue) {
        this.queue = blockingQueue;
    }

    public boolean authenticate() {
        return false;
    }

    public Map<String, ResourceConnection> getConnections() {
        return this.connections;
    }

    public boolean isStartPulling() {
        return this.startPulling;
    }

    public void setConnections(Map<String, ResourceConnection> map) {
        this.connections = map;
    }

    public void setStartPulling(boolean z) {
        this.startPulling = z;
    }

    public GFac getGfac() {
        return this.gfac;
    }

    public void setGfac(GFac gFac) {
        this.gfac = gFac;
    }

    public AuthenticationInfo getAuthenticationInfo() {
        return this.authenticationInfo;
    }

    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
        this.authenticationInfo = authenticationInfo;
    }

    public LinkedBlockingQueue<String> getCancelJobList() {
        return this.cancelJobList;
    }

    public void setCancelJobList(LinkedBlockingQueue<String> linkedBlockingQueue) {
        this.cancelJobList = linkedBlockingQueue;
    }

    private void cleanup(UserMonitorData userMonitorData) {
        Iterator<MonitorID> it = this.removeList.iterator();
        while (it.hasNext()) {
            MonitorID next = it.next();
            try {
                CommonUtils.removeMonitorFromQueue(userMonitorData, next);
            } catch (AiravataMonitorException e) {
                logger.error(e.getMessage(), e);
                logger.error("Error deleting the monitor data: " + next.getJobID());
            }
        }
        this.removeList.clear();
    }
}
