package org.apache.oodt.cas.resource.scheduler;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.SparkInstance;
import org.apache.oodt.cas.resource.structs.StreamingInstance;
import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:WEB-INF/lib/cas-resource-0.8.1.jar:org/apache/oodt/cas/resource/scheduler/SparkScheduler.class */
public class SparkScheduler implements Scheduler {
    SparkContext sc;
    StreamingContext ssc;
    JobQueue queue;
    private static final Logger LOG = Logger.getLogger(SparkScheduler.class.getName());

    public SparkScheduler(JobQueue jobQueue) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster(System.getProperty("resource.runner.spark.host", "local"));
        sparkConf.setAppName("OODT Spark Job");
        SparkScheduler.class.getResource('/' + SparkScheduler.class.getName().replace('.', '/') + ClassUtils.CLASS_FILE_SUFFIX);
        sparkConf.setJars(new String[]{"../lib/cas-resource-0.8-SNAPSHOT.jar"});
        this.sc = new SparkContext(sparkConf);
        this.ssc = new StreamingContext(this.sc, new Duration(10000L));
        this.queue = jobQueue;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                if (!this.queue.isEmpty()) {
                    JobSpec nextJob = this.queue.getNextJob();
                    if (Class.forName(nextJob.getJob().getJobInstanceClassName()).newInstance() instanceof SparkInstance) {
                        schedule(nextJob);
                    } else {
                        LOG.log(Level.WARNING, "Non-Spark job found (" + nextJob.getJob().getId() + ") ignoring.");
                    }
                }
            } catch (ClassNotFoundException e) {
                LOG.log(Level.WARNING, "Class not found: " + e.getMessage());
            } catch (IllegalAccessException e2) {
                LOG.log(Level.WARNING, "Could not access: " + e2.getMessage());
            } catch (InstantiationException e3) {
                LOG.log(Level.WARNING, "Could not instantiate: " + e3.getMessage());
            } catch (JobQueueException e4) {
                LOG.log(Level.WARNING, "Could not get next job from job-queue.");
            } catch (SchedulerException e5) {
                LOG.log(Level.WARNING, "Scheduler exception detected: " + e5.getMessage());
            }
        }
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public boolean schedule(JobSpec jobSpec) throws SchedulerException {
        try {
            SparkInstance sparkInstance = (SparkInstance) GenericResourceManagerObjectFactory.getJobInstanceFromClassName(jobSpec.getJob().getJobInstanceClassName());
            LOG.log(Level.INFO, "Setting SparkContext");
            sparkInstance.setSparkContext(this.sc);
            if (sparkInstance instanceof StreamingInstance) {
                LOG.log(Level.INFO, "Found streaming instance, setting StreamingContext");
                ((StreamingInstance) sparkInstance).setStreamingContext(this.ssc);
            }
            sparkInstance.execute(jobSpec.getIn());
            return false;
        } catch (JobInputException e) {
            LOG.log(Level.WARNING, "Job input exception detected.");
            throw new SchedulerException(e);
        }
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public ResourceNode nodeAvailable(JobSpec jobSpec) throws SchedulerException {
        return null;
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public Monitor getMonitor() {
        return null;
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public Batchmgr getBatchmgr() {
        return null;
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public JobQueue getJobQueue() {
        return this.queue;
    }

    @Override // org.apache.oodt.cas.resource.scheduler.Scheduler
    public QueueManager getQueueManager() {
        return null;
    }
}
