package org.apache.falcon.workflow.engine;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.exception.DAGEngineException;
import org.apache.falcon.execution.ExecutionInstance;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.proxy.SchedulableEntityManagerProxy;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/workflow/engine/OozieDAGEngine.class */
public class OozieDAGEngine implements DAGEngine {
    private final OozieClient client;
    private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100;
    private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
    private final Cluster cluster;
    private static final Logger LOG = LoggerFactory.getLogger(OozieDAGEngine.class);
    private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList("pre-processing", "recordsize", OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME, OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);

    public OozieDAGEngine(Cluster cluster) throws DAGEngineException {
        try {
            this.client = OozieClientFactory.get(cluster);
            this.cluster = cluster;
        } catch (Exception e) {
            throw new DAGEngineException(e);
        }
    }

    public OozieDAGEngine(String str) throws DAGEngineException {
        try {
            this.cluster = (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str);
            this.client = OozieClientFactory.get(this.cluster);
        } catch (Exception e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public String run(ExecutionInstance executionInstance) throws DAGEngineException {
        try {
            Properties runProperties = getRunProperties(executionInstance);
            Path latestStagingPath = EntityUtil.getLatestStagingPath(this.cluster, executionInstance.getEntity());
            switchUserTo(executionInstance.getEntity().getACL().getOwner());
            runProperties.setProperty(OozieClient.USER_NAME, executionInstance.getEntity().getACL().getOwner());
            runProperties.setProperty(OozieClient.APP_PATH, latestStagingPath.toString());
            return this.client.run(runProperties);
        } catch (FalconException e) {
            LOG.error("Falcon Exception : ", (Throwable) e);
            throw new DAGEngineException(e);
        } catch (OozieClientException e2) {
            LOG.error("Oozie client exception:", (Throwable) e2);
            throw new DAGEngineException(e2);
        }
    }

    private void switchUserTo(String str) {
        CurrentUser.authenticate(str);
    }

    private void prepareEntityBuildPath(Entity entity) throws FalconException {
        Path baseStagingPath = EntityUtil.getBaseStagingPath(this.cluster, entity);
        Path logPath = EntityUtil.getLogPath(this.cluster, entity);
        try {
            FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(this.cluster));
            HadoopClientFactory.mkdirsWithDefaultPerms(createProxiedFileSystem, baseStagingPath);
            HadoopClientFactory.mkdirsWithDefaultPerms(createProxiedFileSystem, logPath);
        } catch (IOException e) {
            throw new FalconException("Error preparing base staging dirs: " + baseStagingPath, e);
        }
    }

    private void dryRunInternal(Properties properties, Path path, Entity entity) throws OozieClientException, DAGEngineException {
        if (properties == null) {
            LOG.info("Entity {} is not scheduled on cluster {} with user {}", entity.getName(), this.cluster, entity.getACL().getOwner());
            throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty");
        }
        switchUserTo(entity.getACL().getOwner());
        properties.setProperty(OozieClient.USER_NAME, entity.getACL().getOwner());
        properties.setProperty(OozieClient.APP_PATH, path.toString());
        properties.putAll(getDryRunProperties(entity));
        LOG.info("Dry run with properties {}", properties);
        this.client.dryrun(properties);
    }

    private void switchUser() {
        switchUserTo(System.getProperty(OozieClient.USER_NAME));
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public boolean isScheduled(ExecutionInstance executionInstance) throws DAGEngineException {
        try {
            return statusEquals(this.client.getJobInfo(executionInstance.getExternalID()).getStatus().name(), Job.Status.PREP, Job.Status.RUNNING);
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    private Properties getRunProperties(ExecutionInstance executionInstance) {
        Properties properties = new Properties();
        String print = DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm").print(executionInstance.getInstanceTime());
        properties.put("nominalTime", print);
        properties.put("timeStamp", print);
        properties.put("feedNames", "NONE");
        properties.put("feedInstancePaths", "NONE");
        properties.put("falconInputFeeds", "NONE");
        properties.put("falconInPaths", "NONE");
        properties.put("feedNames", "NONE");
        properties.put("feedInstancePaths", "NONE");
        properties.put("userJMSNotificationEnabled", "true");
        properties.put("systemJMSNotificationEnabled", "false");
        return properties;
    }

    private Properties getDryRunProperties(Entity entity) {
        Properties properties = new Properties();
        String print = DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm").print(DateTime.now());
        properties.put("nominalTime", print);
        properties.put("timeStamp", print);
        properties.put("feedNames", "NONE");
        properties.put("feedInstancePaths", "NONE");
        properties.put("falconInputFeeds", "NONE");
        properties.put("falconInPaths", "NONE");
        properties.put("feedNames", "NONE");
        properties.put("feedInstancePaths", "NONE");
        properties.put("userJMSNotificationEnabled", "true");
        properties.put("systemJMSNotificationEnabled", "false");
        return properties;
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void suspend(ExecutionInstance executionInstance) throws DAGEngineException {
        try {
            this.client.suspend(executionInstance.getExternalID());
            assertStatus(executionInstance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED);
            LOG.info("Suspended job {} of entity {} of time {} on cluster {}", executionInstance.getExternalID(), executionInstance.getEntity().getName(), executionInstance.getInstanceTime(), executionInstance.getCluster());
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void resume(ExecutionInstance executionInstance) throws DAGEngineException {
        try {
            this.client.resume(executionInstance.getExternalID());
            assertStatus(executionInstance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED);
            LOG.info("Resumed job {} of entity {} of time {} on cluster {}", executionInstance.getExternalID(), executionInstance.getEntity().getName(), executionInstance.getInstanceTime(), executionInstance.getCluster());
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void kill(ExecutionInstance executionInstance) throws DAGEngineException {
        try {
            this.client.kill(executionInstance.getExternalID());
            assertStatus(executionInstance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
            LOG.info("Killed job {} of entity {} of time {} on cluster {}", executionInstance.getExternalID(), executionInstance.getEntity().getName(), executionInstance.getInstanceTime(), executionInstance.getCluster());
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void reRun(ExecutionInstance executionInstance, Properties properties, boolean z) throws DAGEngineException {
        String externalID = executionInstance.getExternalID();
        try {
            WorkflowJob jobInfo = this.client.getJobInfo(externalID);
            if (properties == null) {
                properties = new Properties();
            }
            if (!properties.containsKey(OozieClient.RERUN_FAIL_NODES) && !properties.containsKey(OozieClient.RERUN_SKIP_NODES)) {
                properties.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!z));
            }
            Properties properties2 = OozieUtils.toProperties(jobInfo.getConf());
            properties2.putAll(properties);
            properties2.remove(OozieClient.COORDINATOR_APP_PATH);
            properties2.remove(OozieClient.BUNDLE_APP_PATH);
            if (properties2.containsKey(OozieClient.RERUN_FAIL_NODES) && properties2.containsKey(OozieClient.RERUN_SKIP_NODES)) {
                LOG.warn("Both oozie.wf.rerun.skip.nodes and oozie.wf.rerun.failnodes are present in workflow params removingoozie.wf.rerun.skip.nodes");
                properties2.remove(OozieClient.RERUN_SKIP_NODES);
            }
            this.client.reRun(externalID, properties2);
            assertStatus(executionInstance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED);
            LOG.info("Rerun job {} of entity {} of time {} on cluster {}", externalID, executionInstance.getEntity().getName(), executionInstance.getInstanceTime(), executionInstance.getCluster());
        } catch (Exception e) {
            LOG.error("Unable to rerun workflows", (Throwable) e);
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void submit(Entity entity) throws DAGEngineException {
        try {
            OozieOrchestrationWorkflowBuilder oozieOrchestrationWorkflowBuilder = OozieOrchestrationWorkflowBuilder.get(entity, this.cluster, Tag.DEFAULT);
            prepareEntityBuildPath(entity);
            Path newStagingPath = EntityUtil.getNewStagingPath(this.cluster, entity);
            dryRunInternal(oozieOrchestrationWorkflowBuilder.build(this.cluster, newStagingPath), newStagingPath, entity);
        } catch (FalconException e) {
            LOG.error("Falcon Exception : ", (Throwable) e);
            throw new DAGEngineException(e);
        } catch (OozieClientException e2) {
            LOG.error("Oozie client exception:", (Throwable) e2);
            throw new DAGEngineException(e2);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public InstancesResult.Instance info(String str) throws DAGEngineException {
        InstancesResult.Instance instance = new InstancesResult.Instance();
        try {
            LOG.debug("Retrieving details for job {} ", str);
            WorkflowJob jobInfo = this.client.getJobInfo(str);
            instance.startTime = jobInfo.getStartTime();
            if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) {
                instance.endTime = new Date();
            } else {
                instance.endTime = jobInfo.getEndTime();
            }
            instance.cluster = this.cluster.getName();
            instance.runId = jobInfo.getRun();
            instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name());
            instance.logFile = jobInfo.getConsoleUrl();
            instance.wfParams = getWFParams(jobInfo);
            return instance;
        } catch (Exception e) {
            LOG.error("Error when attempting to get info for " + str, (Throwable) e);
            throw new DAGEngineException(e);
        }
    }

    private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob workflowJob) {
        Configuration configuration = new Configuration(false);
        configuration.addResource(new ByteArrayInputStream(workflowJob.getConf().getBytes()));
        InstancesResult.KeyValuePair[] keyValuePairArr = new InstancesResult.KeyValuePair[configuration.size()];
        int i = 0;
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            int i2 = i;
            i++;
            keyValuePairArr[i2] = new InstancesResult.KeyValuePair((String) entry.getKey(), (String) entry.getValue());
        }
        return keyValuePairArr;
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public List<InstancesResult.InstanceAction> getJobDetails(String str) throws DAGEngineException {
        ArrayList arrayList = new ArrayList();
        try {
            for (WorkflowAction workflowAction : this.client.getJobInfo(str).getActions()) {
                if (workflowAction.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(workflowAction.getExternalId())) {
                    for (WorkflowAction workflowAction2 : this.client.getJobInfo(workflowAction.getExternalId()).getActions()) {
                        if (!workflowAction2.getType().startsWith(":")) {
                            arrayList.add(new InstancesResult.InstanceAction(workflowAction2.getName(), workflowAction2.getExternalStatus(), workflowAction2.getConsoleUrl()));
                        }
                    }
                } else if (!workflowAction.getType().startsWith(":")) {
                    if (PARENT_WF_ACTION_NAMES.contains(workflowAction.getName()) && !Job.Status.SUCCEEDED.toString().equals(workflowAction.getExternalStatus())) {
                        arrayList.add(new InstancesResult.InstanceAction(workflowAction.getName(), workflowAction.getExternalStatus(), workflowAction.getConsoleUrl()));
                    } else if (!PARENT_WF_ACTION_NAMES.contains(workflowAction.getName()) && !StringUtils.equals(workflowAction.getExternalId(), "-")) {
                        arrayList.add(new InstancesResult.InstanceAction(workflowAction.getName(), workflowAction.getExternalStatus(), workflowAction.getConsoleUrl()));
                    }
                }
            }
            return arrayList;
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public boolean isAlive() throws DAGEngineException {
        try {
            return this.client.getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
        } catch (OozieClientException e) {
            throw new DAGEngineException("Unable to reach Oozie server.", e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public Properties getConfiguration(String str) throws DAGEngineException {
        Properties properties = new Properties();
        try {
            switchUser();
            WorkflowJob jobInfo = this.client.getJobInfo(str);
            Configuration configuration = new Configuration(false);
            configuration.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
            Iterator it = configuration.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                properties.put(entry.getKey(), entry.getValue());
            }
            return properties;
        } catch (OozieClientException e) {
            throw new DAGEngineException(e);
        }
    }

    @Override // org.apache.falcon.workflow.engine.DAGEngine
    public void touch(Entity entity, Boolean bool) throws DAGEngineException {
        try {
            OozieOrchestrationWorkflowBuilder oozieOrchestrationWorkflowBuilder = OozieOrchestrationWorkflowBuilder.get(entity, this.cluster, Tag.DEFAULT);
            if (!bool.booleanValue()) {
                Path path = new Path("/tmp", SchedulableEntityManagerProxy.FALCON_TAG + entity.getName() + System.currentTimeMillis());
                dryRunInternal(oozieOrchestrationWorkflowBuilder.build(this.cluster, path), path, entity);
            }
            oozieOrchestrationWorkflowBuilder.build(this.cluster, EntityUtil.getNewStagingPath(this.cluster, entity));
        } catch (FalconException e) {
            LOG.error("Falcon Exception : ", (Throwable) e);
            throw new DAGEngineException(e);
        } catch (OozieClientException e2) {
            LOG.error("Oozie client exception:", (Throwable) e2);
            throw new DAGEngineException(e2);
        }
    }

    private void assertStatus(String str, Job.Status... statusArr) throws DAGEngineException {
        String str2 = null;
        try {
            int parseInt = Integer.parseInt(RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30"));
            for (int i = 0; i < parseInt; i++) {
                try {
                    str2 = this.client.getJobInfo(str).getStatus().name();
                    if (statusEquals(str2, statusArr)) {
                        return;
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                } catch (OozieClientException e2) {
                    LOG.error("Unable to get status of workflow: " + str, (Throwable) e2);
                    throw new DAGEngineException(e2);
                }
            }
            throw new DAGEngineException("For Job" + str + ", actual statuses: " + str2 + ", expected statuses: " + Arrays.toString(statusArr));
        } catch (NumberFormatException e3) {
            throw new DAGEngineException("Invalid value provided for runtime property \"workflow.status.retry.count\". Please provide an integer value.");
        }
    }

    private boolean statusEquals(String str, Job.Status... statusArr) {
        for (Job.Status status : statusArr) {
            if (str.equals(status.name())) {
                return true;
            }
        }
        return false;
    }
}
