package org.apache.sqoop.driver;

import java.util.Date;
import java.util.Iterator;
import org.apache.commons.lang.time.DateUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.driver.configuration.JobConfiguration;
import org.apache.sqoop.error.code.DriverError;
import org.apache.sqoop.job.etl.Destroyer;
import org.apache.sqoop.job.etl.DestroyerContext;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.job.etl.Transferable;
import org.apache.sqoop.model.ConfigUtils;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.submission.SubmissionStatus;
import org.apache.sqoop.utils.ClassUtils;

/* loaded from: input_file:WEB-INF/lib/sqoop-core-1.99.6.jar:org/apache/sqoop/driver/JobManager.class */
public class JobManager implements Reconfigurable {
    private static final Logger LOG = Logger.getLogger(JobManager.class);
    private static JobManager instance = new JobManager();
    private static final long DEFAULT_PURGE_THRESHOLD = 86400000;
    private static final long DEFAULT_PURGE_SLEEP = 86400000;
    private static final long DEFAULT_UPDATE_SLEEP = 300000;
    private SubmissionEngine submissionEngine;
    private ExecutionEngine executionEngine;
    private PurgeThread purgeThread = null;
    private UpdateThread updateThread = null;
    private boolean running = true;
    private long purgeThreshold;
    private long purgeSleep;
    private long updateSleep;
    private String notificationBaseUrl;

    /* loaded from: input_file:WEB-INF/lib/sqoop-core-1.99.6.jar:org/apache/sqoop/driver/JobManager$PurgeThread.class */
    private class PurgeThread extends Thread {
        public PurgeThread() {
            super("PurgeThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JobManager.LOG.info("Starting submission manager purge thread");
            while (JobManager.this.running) {
                try {
                    JobManager.LOG.info("Purging old submissions");
                    RepositoryManager.getInstance().getRepository().purgeSubmissions(new Date(new Date().getTime() - JobManager.this.purgeThreshold));
                    Thread.sleep(JobManager.this.purgeSleep);
                } catch (InterruptedException e) {
                    JobManager.LOG.debug("Purge thread interrupted", e);
                }
            }
            JobManager.LOG.info("Ending submission manager purge thread");
        }
    }

    /* loaded from: input_file:WEB-INF/lib/sqoop-core-1.99.6.jar:org/apache/sqoop/driver/JobManager$UpdateThread.class */
    private class UpdateThread extends Thread {
        public UpdateThread() {
            super("UpdateThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JobManager.LOG.info("Starting submission manager update thread");
            while (JobManager.this.running) {
                try {
                    JobManager.LOG.debug("Updating running submissions");
                    Iterator<MSubmission> it = RepositoryManager.getInstance().getRepository().findUnfinishedSubmissions().iterator();
                    while (it.hasNext()) {
                        JobManager.this.updateSubmission(it.next());
                    }
                    Thread.sleep(JobManager.this.updateSleep);
                } catch (InterruptedException e) {
                    JobManager.LOG.debug("Purge thread interrupted", e);
                }
            }
            JobManager.LOG.info("Ending submission manager update thread");
        }
    }

    public static JobManager getInstance() {
        return instance;
    }

    public static void setInstance(JobManager jobManager) {
        instance = jobManager;
    }

    public void setNotificationBaseUrl(String str) {
        LOG.debug("Setting notification base URL to " + str);
        this.notificationBaseUrl = str;
    }

    public String getNotificationBaseUrl() {
        return this.notificationBaseUrl;
    }

    public synchronized void destroy() {
        LOG.trace("Begin submission engine manager destroy");
        this.running = false;
        try {
            this.purgeThread.interrupt();
            this.purgeThread.join();
        } catch (InterruptedException e) {
            LOG.error("Interrupted joining purgeThread");
        }
        try {
            this.updateThread.interrupt();
            this.updateThread.join();
        } catch (InterruptedException e2) {
            LOG.error("Interrupted joining updateThread");
        }
        if (this.submissionEngine != null) {
            this.submissionEngine.destroy();
        }
        if (this.executionEngine != null) {
            this.executionEngine.destroy();
        }
    }

    public synchronized void initialize() {
        LOG.trace("Begin submission engine manager initialization");
        MapContext context = SqoopConfiguration.getInstance().getContext();
        String string = context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
        this.submissionEngine = (SubmissionEngine) ClassUtils.instantiate(string, new Object[0]);
        if (this.submissionEngine == null) {
            throw new SqoopException(DriverError.DRIVER_0001, string);
        }
        this.submissionEngine.initialize(context, DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
        String string2 = context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
        this.executionEngine = (ExecutionEngine) ClassUtils.instantiate(string2, new Object[0]);
        if (this.executionEngine == null) {
            throw new SqoopException(DriverError.DRIVER_0007, string2);
        }
        if (!this.submissionEngine.isExecutionEngineSupported(this.executionEngine.getClass())) {
            throw new SqoopException(DriverError.DRIVER_0008);
        }
        this.executionEngine.initialize(context, DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
        this.purgeThreshold = context.getLong(DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, DateUtils.MILLIS_PER_DAY);
        this.purgeSleep = context.getLong(DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, DateUtils.MILLIS_PER_DAY);
        this.purgeThread = new PurgeThread();
        this.purgeThread.start();
        this.updateSleep = context.getLong(DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, DEFAULT_UPDATE_SLEEP);
        this.updateThread = new UpdateThread();
        this.updateThread.start();
        SqoopConfiguration.getInstance().getProvider().registerListener(new SqoopConfiguration.CoreConfigurationListener(this));
        LOG.info("Submission manager initialized: OK");
    }

    public MSubmission start(long j, HttpEventContext httpEventContext) {
        MSubmission createJobSubmission = createJobSubmission(httpEventContext, j);
        JobRequest createJobRequest = createJobRequest(j, createJobSubmission);
        prepareJob(createJobRequest);
        synchronized (getClass()) {
            MSubmission findLastSubmissionForJob = RepositoryManager.getInstance().getRepository().findLastSubmissionForJob(j);
            if (findLastSubmissionForJob != null && findLastSubmissionForJob.getStatus().isRunning()) {
                throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + j);
            }
            if (!this.submissionEngine.submit(createJobRequest)) {
                invokeDestroyerOnJobFailure(createJobRequest);
                createJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
            }
            RepositoryManager.getInstance().getRepository().createSubmission(createJobSubmission);
        }
        return createJobSubmission;
    }

    private JobRequest createJobRequest(long j, MSubmission mSubmission) {
        MJob job = getJob(j);
        MLink link = getLink(job.getFromLinkId());
        MLink link2 = getLink(job.getToLinkId());
        SqoopConnector sqoopConnector = getSqoopConnector(link.getConnectorId());
        validateSupportedDirection(sqoopConnector, Direction.FROM);
        SqoopConnector sqoopConnector2 = getSqoopConnector(link2.getConnectorId());
        validateSupportedDirection(sqoopConnector2, Direction.TO);
        Object instantiate = ClassUtils.instantiate(sqoopConnector.getLinkConfigurationClass(), new Object[0]);
        ConfigUtils.fromConfigs(link.getConnectorLinkConfig().getConfigs(), instantiate);
        Object instantiate2 = ClassUtils.instantiate(sqoopConnector2.getLinkConfigurationClass(), new Object[0]);
        ConfigUtils.fromConfigs(link2.getConnectorLinkConfig().getConfigs(), instantiate2);
        Object instantiate3 = ClassUtils.instantiate(sqoopConnector.getJobConfigurationClass(Direction.FROM), new Object[0]);
        ConfigUtils.fromConfigs(job.getFromJobConfig().getConfigs(), instantiate3);
        Object instantiate4 = ClassUtils.instantiate(sqoopConnector2.getJobConfigurationClass(Direction.TO), new Object[0]);
        ConfigUtils.fromConfigs(job.getToJobConfig().getConfigs(), instantiate4);
        Object instantiate5 = ClassUtils.instantiate(Driver.getInstance().getDriverJobConfigurationClass(), new Object[0]);
        ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), instantiate5);
        JobRequest createJobRequest = this.executionEngine.createJobRequest();
        createJobRequest.setJobSubmission(mSubmission);
        createJobRequest.setConnector(Direction.FROM, sqoopConnector);
        createJobRequest.setConnector(Direction.TO, sqoopConnector2);
        createJobRequest.setConnectorLinkConfig(Direction.FROM, instantiate);
        createJobRequest.setConnectorLinkConfig(Direction.TO, instantiate2);
        createJobRequest.setJobConfig(Direction.FROM, instantiate3);
        createJobRequest.setJobConfig(Direction.TO, instantiate4);
        createJobRequest.setDriverConfig(instantiate5);
        createJobRequest.setJobName(job.getName());
        createJobRequest.setJobId(job.getPersistenceId());
        createJobRequest.setNotificationUrl(this.notificationBaseUrl + j);
        createJobRequest.setIntermediateDataFormat(sqoopConnector.getIntermediateDataFormat(), Direction.FROM);
        createJobRequest.setIntermediateDataFormat(sqoopConnector2.getIntermediateDataFormat(), Direction.TO);
        createJobRequest.setFrom(sqoopConnector.getFrom());
        createJobRequest.setTo(sqoopConnector2.getTo());
        addStandardJars(createJobRequest);
        addConnectorClass(createJobRequest, sqoopConnector);
        addConnectorClass(createJobRequest, sqoopConnector2);
        addConnectorIDFClass(createJobRequest, sqoopConnector.getIntermediateDataFormat());
        addConnectorIDFClass(createJobRequest, sqoopConnector2.getIntermediateDataFormat());
        addConnectorInitializerJars(createJobRequest, Direction.FROM);
        addConnectorInitializerJars(createJobRequest, Direction.TO);
        addIDFDependentJars(createJobRequest, Direction.FROM);
        addIDFDependentJars(createJobRequest, Direction.TO);
        initializeConnector(createJobRequest, Direction.FROM);
        initializeConnector(createJobRequest, Direction.TO);
        createJobRequest.getJobSubmission().setFromSchema(getSchemaForConnector(createJobRequest, Direction.FROM));
        createJobRequest.getJobSubmission().setToSchema(getSchemaForConnector(createJobRequest, Direction.TO));
        LOG.debug("Using entities: " + createJobRequest.getFrom() + ", " + createJobRequest.getTo());
        return createJobRequest;
    }

    private void addConnectorClass(JobRequest jobRequest, SqoopConnector sqoopConnector) {
        jobRequest.addJarForClass(sqoopConnector.getClass());
    }

    private void addConnectorIDFClass(JobRequest jobRequest, Class<? extends IntermediateDataFormat<?>> cls) {
        jobRequest.addJarForClass(cls);
    }

    private void addStandardJars(JobRequest jobRequest) {
        jobRequest.addJarForClass(MapContext.class);
        jobRequest.addJarForClass(Driver.class);
        jobRequest.addJarForClass(SqoopConnector.class);
        jobRequest.addJarForClass(this.executionEngine.getClass());
    }

    MSubmission createJobSubmission(HttpEventContext httpEventContext, long j) {
        MSubmission mSubmission = new MSubmission(j);
        mSubmission.setCreationUser(httpEventContext.getUsername());
        mSubmission.setLastUpdateUser(httpEventContext.getUsername());
        return mSubmission;
    }

    SqoopConnector getSqoopConnector(long j) {
        return ConnectorManager.getInstance().getSqoopConnector(j);
    }

    void validateSupportedDirection(SqoopConnector sqoopConnector, Direction direction) {
        if (!sqoopConnector.getSupportedDirections().contains(direction)) {
            throw new SqoopException(DriverError.DRIVER_0011, "Connector: " + sqoopConnector.getClass().getCanonicalName());
        }
    }

    MLink getLink(long j) {
        MLink findLink = RepositoryManager.getInstance().getRepository().findLink(j);
        if (findLink.getEnabled()) {
            return findLink;
        }
        throw new SqoopException(DriverError.DRIVER_0010, "Connection id: " + findLink.getPersistenceId());
    }

    MJob getJob(long j) {
        MJob findJob = RepositoryManager.getInstance().getRepository().findJob(j);
        if (findJob == null) {
            throw new SqoopException(DriverError.DRIVER_0004, "Unknown job id: " + j);
        }
        if (findJob.getEnabled()) {
            return findJob;
        }
        throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + findJob.getPersistenceId());
    }

    private void initializeConnector(JobRequest jobRequest, Direction direction) {
        getConnectorInitializer(jobRequest, direction).initialize(getConnectorInitializerContext(jobRequest, direction), jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction));
    }

    private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) {
        return getConnectorInitializer(jobRequest, direction).getSchema(getConnectorInitializerContext(jobRequest, direction), jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction));
    }

    private void addIDFDependentJars(JobRequest jobRequest, Direction direction) {
        jobRequest.addJars(((IntermediateDataFormat) ClassUtils.instantiate(jobRequest.getIntermediateDataFormat(direction), new Object[0])).getJars());
    }

    private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
        jobRequest.addJars(getConnectorInitializer(jobRequest, direction).getJars(getConnectorInitializerContext(jobRequest, direction), jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)));
    }

    private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
        Class<? extends Initializer> initializer = (direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo()).getInitializer();
        Initializer initializer2 = (Initializer) ClassUtils.instantiate(initializer, new Object[0]);
        if (initializer2 == null) {
            throw new SqoopException(DriverError.DRIVER_0006, "Can't create connector initializer instance: " + initializer.getName());
        }
        return initializer2;
    }

    private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) {
        return new InitializerContext(jobRequest.getConnectorContext(direction));
    }

    void prepareJob(JobRequest jobRequest) {
        JobConfiguration jobConfiguration = (JobConfiguration) jobRequest.getDriverConfig();
        jobRequest.setExtractors(jobConfiguration.throttlingConfig.numExtractors);
        jobRequest.setLoaders(jobConfiguration.throttlingConfig.numLoaders);
        this.executionEngine.prepareJob(jobRequest);
    }

    void invokeDestroyerOnJobFailure(JobRequest jobRequest) {
        Transferable from = jobRequest.getFrom();
        Transferable to = jobRequest.getTo();
        Class<? extends Destroyer> destroyer = from.getDestroyer();
        Class<? extends Destroyer> destroyer2 = to.getDestroyer();
        Destroyer destroyer3 = (Destroyer) ClassUtils.instantiate(destroyer, new Object[0]);
        Destroyer destroyer4 = (Destroyer) ClassUtils.instantiate(destroyer2, new Object[0]);
        if (destroyer3 == null) {
            throw new SqoopException(DriverError.DRIVER_0006, "Can't create toDestroyer instance: " + destroyer.getName());
        }
        if (destroyer4 == null) {
            throw new SqoopException(DriverError.DRIVER_0006, "Can't create toDestroyer instance: " + destroyer2.getName());
        }
        DestroyerContext destroyerContext = new DestroyerContext(jobRequest.getConnectorContext(Direction.FROM), false, jobRequest.getJobSubmission().getFromSchema());
        DestroyerContext destroyerContext2 = new DestroyerContext(jobRequest.getConnectorContext(Direction.TO), false, jobRequest.getJobSubmission().getToSchema());
        destroyer3.destroy(destroyerContext, jobRequest.getConnectorLinkConfig(Direction.FROM), jobRequest.getJobConfig(Direction.FROM));
        destroyer4.destroy(destroyerContext2, jobRequest.getConnectorLinkConfig(Direction.TO), jobRequest.getJobConfig(Direction.TO));
    }

    public MSubmission stop(long j, HttpEventContext httpEventContext) {
        MSubmission findLastSubmissionForJob = RepositoryManager.getInstance().getRepository().findLastSubmissionForJob(j);
        if (findLastSubmissionForJob == null || !findLastSubmissionForJob.getStatus().isRunning()) {
            throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + j + " is not running hence cannot stop");
        }
        this.submissionEngine.stop(findLastSubmissionForJob.getExternalJobId());
        findLastSubmissionForJob.setLastUpdateUser(httpEventContext.getUsername());
        updateSubmission(findLastSubmissionForJob);
        return findLastSubmissionForJob;
    }

    public MSubmission status(long j) {
        MSubmission findLastSubmissionForJob = RepositoryManager.getInstance().getRepository().findLastSubmissionForJob(j);
        if (findLastSubmissionForJob == null) {
            return new MSubmission(j, new Date(), SubmissionStatus.NEVER_EXECUTED);
        }
        if (findLastSubmissionForJob.getStatus().isRunning()) {
            updateSubmission(findLastSubmissionForJob);
        }
        return findLastSubmissionForJob;
    }

    public void updateSubmission(MSubmission mSubmission) {
        this.submissionEngine.update(mSubmission);
        RepositoryManager.getInstance().getRepository().updateSubmission(mSubmission);
    }

    @Override // org.apache.sqoop.core.Reconfigurable
    public synchronized void configurationChanged() {
        LOG.info("Begin submission engine manager reconfiguring");
        MapContext context = SqoopConfiguration.getInstance().getContext();
        MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
        String string = context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
        if (string == null || string.trim().length() == 0) {
            throw new SqoopException(DriverError.DRIVER_0001, string);
        }
        if (!string.equals(oldContext.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE))) {
            LOG.warn("Submission engine cannot be replaced at the runtime. You might need to restart the server.");
        }
        String string2 = context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
        if (string2 == null || string2.trim().length() == 0) {
            throw new SqoopException(DriverError.DRIVER_0007, string2);
        }
        if (!string2.equals(oldContext.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE))) {
            LOG.warn("Execution engine cannot be replaced at the runtime. You might need to restart the server.");
        }
        this.purgeThreshold = context.getLong(DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, DateUtils.MILLIS_PER_DAY);
        this.purgeSleep = context.getLong(DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP, DateUtils.MILLIS_PER_DAY);
        this.purgeThread.interrupt();
        this.updateSleep = context.getLong(DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP, DEFAULT_UPDATE_SLEEP);
        this.updateThread.interrupt();
        LOG.info("Submission engine manager reconfigured.");
    }
}
