package org.apache.falcon.workflow.engine;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieBundleBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.CONFIGURATION;
import org.apache.falcon.oozie.bundle.COORDINATOR;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.JMSConnectionInfo;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/falcon/workflow/engine/OozieWorkflowEngine.class */
public class OozieWorkflowEngine extends AbstractWorkflowEngine {
    private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
    private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
    private static final String FALCON_SKIP_DRYRUN = "falcon.skip.dryrun";
    private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100;
    private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
    private static final Logger LOG = LoggerFactory.getLogger(OozieWorkflowEngine.class);
    private static final BundleJob MISSING = new NullBundleJob();
    private static final List<WorkflowJob.Status> WF_KILL_PRECOND = Arrays.asList(WorkflowJob.Status.PREP, WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED, WorkflowJob.Status.FAILED);
    private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
    private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
    private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
    private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
    private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED, Job.Status.RUNNINGWITHERROR, Job.Status.PAUSED, Job.Status.PREPPAUSED, Job.Status.PAUSEDWITHERROR);
    private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS = Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUSPENDEDWITHERROR);
    private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.RUNNINGWITHERROR);
    private static final List<Job.Status> BUNDLE_SUCCEEDED_STATUS = Arrays.asList(Job.Status.SUCCEEDED);
    private static final List<Job.Status> BUNDLE_FAILED_STATUS = Arrays.asList(Job.Status.FAILED, Job.Status.DONEWITHERROR);
    private static final List<Job.Status> BUNDLE_KILLED_STATUS = Arrays.asList(Job.Status.KILLED);
    private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
    private static final List<Job.Status> BUNDLE_RESUME_PRECOND = Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
    private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList("pre-processing", "recordsize", "succeeded-post-processing", "failed-post-processing");
    private static final String[] BUNDLE_UPDATEABLE_PROPS = {"parallel", "clusters.clusters[\\d+].validity.end"};
    public static final ConfigurationStore STORE = ConfigurationStore.get();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/falcon/workflow/engine/OozieWorkflowEngine$BundleAction.class */
    public enum BundleAction {
        SUSPEND,
        RESUME,
        KILL
    }

    /* loaded from: input_file:org/apache/falcon/workflow/engine/OozieWorkflowEngine$BundleStatus.class */
    private enum BundleStatus {
        ACTIVE,
        RUNNING,
        SUSPENDED,
        FAILED,
        KILLED,
        SUCCEEDED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/falcon/workflow/engine/OozieWorkflowEngine$JobAction.class */
    public enum JobAction {
        KILL,
        SUSPEND,
        RESUME,
        RERUN,
        STATUS,
        SUMMARY,
        PARAMS
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/falcon/workflow/engine/OozieWorkflowEngine$OozieTimeUnit.class */
    public enum OozieTimeUnit {
        MINUTE(Frequency.TimeUnit.minutes),
        HOUR(Frequency.TimeUnit.hours),
        DAY(Frequency.TimeUnit.days),
        WEEK(null),
        MONTH(Frequency.TimeUnit.months),
        END_OF_DAY(null),
        END_OF_MONTH(null),
        NONE(null);

        private Frequency.TimeUnit falconTimeUnit;

        OozieTimeUnit(Frequency.TimeUnit timeUnit) {
            this.falconTimeUnit = timeUnit;
        }

        public Frequency.TimeUnit getFalconTimeUnit() {
            if (this.falconTimeUnit == null) {
                throw new IllegalStateException("Invalid coord frequency: " + name());
            }
            return this.falconTimeUnit;
        }
    }

    public OozieWorkflowEngine() {
        registerListener(new OozieHouseKeepingService());
    }

    public boolean isAlive(Cluster cluster) throws FalconException {
        try {
            return OozieClientFactory.get(cluster).getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
        } catch (OozieClientException e) {
            throw new FalconException("Unable to reach Oozie server.", e);
        }
    }

    public void schedule(Entity entity, Boolean bool, Map<String, String> map) throws FalconException {
        Map<String, BundleJob> findLatestBundle = findLatestBundle(entity);
        ArrayList<String> arrayList = new ArrayList();
        for (Map.Entry<String, BundleJob> entry : findLatestBundle.entrySet()) {
            String key = entry.getKey();
            if (entry.getValue() == MISSING) {
                arrayList.add(key);
            } else {
                LOG.debug("Entity {} is already scheduled on cluster {}", entity.getName(), key);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        OozieEntityBuilder oozieEntityBuilder = OozieEntityBuilder.get(entity);
        for (String str : arrayList) {
            Cluster cluster = (Cluster) STORE.get(EntityType.CLUSTER, str);
            prepareEntityBuildPath(entity, cluster);
            Properties build = oozieEntityBuilder.build(cluster, EntityUtil.getNewStagingPath(cluster, entity));
            if (build == null) {
                LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster);
            } else {
                dryRunInternal(cluster, new Path(build.getProperty(OozieEntityBuilder.ENTITY_PATH)), bool);
                scheduleEntity(str, build, entity);
            }
        }
    }

    private void prepareEntityBuildPath(Entity entity, Cluster cluster) throws FalconException {
        Path baseStagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
        Path logPath = EntityUtil.getLogPath(cluster, entity);
        try {
            FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
            HadoopClientFactory.mkdirsWithDefaultPerms(createProxiedFileSystem, baseStagingPath);
            HadoopClientFactory.mkdirsWithDefaultPerms(createProxiedFileSystem, logPath);
        } catch (IOException e) {
            throw new FalconException("Error preparing base staging dirs: " + baseStagingPath, e);
        }
    }

    public void dryRun(Entity entity, String str, Boolean bool) throws FalconException {
        OozieEntityBuilder oozieEntityBuilder = OozieEntityBuilder.get(entity);
        Path path = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
        Cluster cluster = (Cluster) STORE.get(EntityType.CLUSTER, str);
        Properties build = oozieEntityBuilder.build(cluster, path);
        if (build != null) {
            dryRunInternal(cluster, new Path(build.getProperty(OozieEntityBuilder.ENTITY_PATH)), bool);
        }
    }

    private void dryRunInternal(Cluster cluster, Path path, Boolean bool) throws FalconException {
        if (null != bool && bool.booleanValue()) {
            LOG.info("Skipping dryrun as directed by param in cli/RestApi.");
            return;
        }
        if (Boolean.valueOf(RuntimeProperties.get().getProperty(FALCON_SKIP_DRYRUN, "false").toLowerCase()).booleanValue()) {
            LOG.info("Skipping dryrun as directed by Runtime properties.");
            return;
        }
        BUNDLEAPP unmarshal = OozieBundleBuilder.unmarshal(cluster, path);
        OozieClient oozieClient = OozieClientFactory.get(cluster.getName());
        for (COORDINATOR coordinator : unmarshal.getCoordinator()) {
            Properties properties = new Properties();
            properties.setProperty("oozie.coord.application.path", coordinator.getAppPath());
            for (CONFIGURATION.Property property : coordinator.getConfiguration().getProperty()) {
                properties.setProperty(property.getName(), property.getValue());
            }
            try {
                LOG.info("dryRun with properties {}", properties);
                oozieClient.dryrun(properties);
            } catch (OozieClientException e) {
                throw new FalconException(e);
            }
        }
    }

    public boolean isActive(Entity entity) throws FalconException {
        return isBundleInState(findLatestBundle(entity), BundleStatus.ACTIVE);
    }

    public boolean isSuspended(Entity entity) throws FalconException {
        return isBundleInState(findLatestBundle(entity), BundleStatus.SUSPENDED);
    }

    public boolean isCompleted(Entity entity) throws FalconException {
        Map<String, BundleJob> findLatestBundle = findLatestBundle(entity);
        return isBundleInState(findLatestBundle, BundleStatus.SUCCEEDED) || isBundleInState(findLatestBundle, BundleStatus.FAILED) || isBundleInState(findLatestBundle, BundleStatus.KILLED);
    }

    private boolean isBundleInState(Map<String, BundleJob> map, BundleStatus bundleStatus) throws FalconException {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, BundleJob> entry : map.entrySet()) {
            if (entry.getValue() == MISSING) {
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            map.remove((String) it.next());
        }
        if (map.size() == 0) {
            return false;
        }
        for (BundleJob bundleJob : map.values()) {
            switch (bundleStatus) {
                case ACTIVE:
                    if (!BUNDLE_ACTIVE_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
                case RUNNING:
                    if (!BUNDLE_RUNNING_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
                case SUSPENDED:
                    if (!BUNDLE_SUSPENDED_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
                case FAILED:
                    if (!BUNDLE_FAILED_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
                case KILLED:
                    if (!BUNDLE_KILLED_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
                case SUCCEEDED:
                    if (!BUNDLE_SUCCEEDED_STATUS.contains(bundleJob.getStatus())) {
                        return false;
                    }
                    break;
            }
            LOG.debug("Bundle {} is in state {}", bundleJob.getAppName(), bundleStatus.name());
        }
        return true;
    }

    private List<BundleJob> findBundles(Entity entity, String str) throws FalconException {
        Cluster cluster = STORE.get(EntityType.CLUSTER, str);
        ArrayList arrayList = new ArrayList();
        try {
            List<BundleJob> bundleJobsInfo = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo("name=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
            if (bundleJobsInfo != null) {
                for (BundleJob bundleJob : bundleJobsInfo) {
                    if (EntityUtil.isStagingPath(cluster, entity, new Path(new Path(bundleJob.getAppPath()).toUri().getPath()))) {
                        arrayList.add(getBundleInfo(str, bundleJob.getId()));
                        LOG.debug("Found bundle {} with app path {} and status {}", new Object[]{bundleJob.getId(), bundleJob.getAppPath(), bundleJob.getStatus()});
                    }
                }
            }
            return arrayList;
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private Map<String, List<BundleJob>> findBundles(Entity entity) throws FalconException {
        Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
        HashMap hashMap = new HashMap();
        for (String str : clustersDefinedInColos) {
            hashMap.put(str, findBundles(entity, str));
        }
        return hashMap;
    }

    private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
        Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
        HashMap hashMap = new HashMap();
        for (String str : clustersDefinedInColos) {
            hashMap.put(str, findLatestBundle(entity, str));
        }
        return hashMap;
    }

    private BundleJob findLatestBundle(Entity entity, String str) throws FalconException {
        List<BundleJob> findBundles = findBundles(entity, str);
        return (findBundles == null || findBundles.isEmpty()) ? MISSING : (BundleJob) Collections.max(findBundles, new Comparator<BundleJob>() { // from class: org.apache.falcon.workflow.engine.OozieWorkflowEngine.1
            @Override // java.util.Comparator
            public int compare(BundleJob bundleJob, BundleJob bundleJob2) {
                return bundleJob.getCreatedTime().compareTo(bundleJob2.getCreatedTime());
            }
        });
    }

    public String suspend(Entity entity) throws FalconException {
        return doBundleAction(entity, BundleAction.SUSPEND);
    }

    public String resume(Entity entity) throws FalconException {
        return doBundleAction(entity, BundleAction.RESUME);
    }

    public String delete(Entity entity) throws FalconException {
        return doBundleAction(entity, BundleAction.KILL);
    }

    public String delete(Entity entity, String str) throws FalconException {
        return doBundleAction(entity, BundleAction.KILL, str);
    }

    private String doBundleAction(Entity entity, BundleAction bundleAction) throws FalconException {
        String str = null;
        Iterator it = EntityUtil.getClustersDefinedInColos(entity).iterator();
        while (it.hasNext()) {
            str = doBundleAction(entity, bundleAction, (String) it.next());
        }
        return str;
    }

    private String doBundleAction(Entity entity, BundleAction bundleAction, String str) throws FalconException {
        List<BundleJob> findBundles = findBundles(entity, str);
        beforeAction(entity, bundleAction, str);
        for (BundleJob bundleJob : findBundles) {
            switch (bundleAction) {
                case SUSPEND:
                    if (!BUNDLE_SUSPENDED_STATUS.contains(bundleJob.getStatus()) && BUNDLE_SUSPEND_PRECOND.contains(bundleJob.getStatus())) {
                        suspend(str, bundleJob.getId());
                        break;
                    }
                    break;
                case RESUME:
                    if (!BUNDLE_RUNNING_STATUS.contains(bundleJob.getStatus()) && BUNDLE_RESUME_PRECOND.contains(bundleJob.getStatus())) {
                        resume(str, bundleJob.getId());
                        break;
                    }
                    break;
                case KILL:
                    killBundle(str, bundleJob);
                    break;
            }
        }
        afterAction(entity, bundleAction, str);
        return "SUCCESS";
    }

    private void killBundle(String str, BundleJob bundleJob) throws FalconException {
        OozieClient oozieClient = OozieClientFactory.get(str);
        try {
            for (CoordinatorJob coordinatorJob : bundleJob.getCoordinators()) {
                oozieClient.kill(coordinatorJob.getId());
                LOG.debug("Killed coord {} on cluster {}", coordinatorJob.getId(), str);
            }
            oozieClient.kill(bundleJob.getId());
            LOG.debug("Killed bundle {} on cluster {}", bundleJob.getId(), str);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private void beforeAction(Entity entity, BundleAction bundleAction, String str) throws FalconException {
        for (WorkflowEngineActionListener workflowEngineActionListener : this.listeners) {
            switch (bundleAction) {
                case SUSPEND:
                    workflowEngineActionListener.beforeSuspend(entity, str);
                    break;
                case RESUME:
                    workflowEngineActionListener.beforeResume(entity, str);
                    break;
                case KILL:
                    workflowEngineActionListener.beforeDelete(entity, str);
                    break;
            }
        }
    }

    private void afterAction(Entity entity, BundleAction bundleAction, String str) throws FalconException {
        for (WorkflowEngineActionListener workflowEngineActionListener : this.listeners) {
            switch (bundleAction) {
                case SUSPEND:
                    workflowEngineActionListener.afterSuspend(entity, str);
                    break;
                case RESUME:
                    workflowEngineActionListener.afterResume(entity, str);
                    break;
                case KILL:
                    workflowEngineActionListener.afterDelete(entity, str);
                    break;
            }
        }
    }

    public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> list) throws FalconException {
        try {
            Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
            ArrayList arrayList = new ArrayList();
            for (String str : clustersDefinedInColos) {
                OozieClient oozieClient = OozieClientFactory.get(str);
                List<WorkflowJob> runningWorkflows = getRunningWorkflows(str, EntityUtil.getWorkflowNames(entity));
                if (runningWorkflows != null) {
                    Iterator<WorkflowJob> it = runningWorkflows.iterator();
                    while (it.hasNext()) {
                        WorkflowJob jobInfo = oozieClient.getJobInfo(it.next().getId());
                        if (!StringUtils.isEmpty(jobInfo.getParentId())) {
                            CoordinatorAction coordActionInfo = oozieClient.getCoordActionInfo(jobInfo.getParentId());
                            InstancesResult.Instance instance = new InstancesResult.Instance(str, SchemaHelper.formatDateUTC(coordActionInfo.getNominalTime()), InstancesResult.WorkflowStatus.RUNNING);
                            instance.startTime = jobInfo.getStartTime();
                            if (entity.getEntityType() == EntityType.FEED) {
                                instance.sourceCluster = getSourceCluster(str, coordActionInfo, entity);
                            }
                            arrayList.add(instance);
                        }
                    }
                }
            }
            InstancesResult instancesResult = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances");
            instancesResult.setInstances((InstancesResult.Instance[]) arrayList.toArray(new InstancesResult.Instance[arrayList.size()]));
            return instancesResult;
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    public InstancesResult killInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.KILL, entity, date, date2, properties, list);
    }

    public InstancesResult reRunInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list, Boolean bool) throws FalconException {
        if (bool == null) {
            bool = false;
        }
        return doJobAction(JobAction.RERUN, entity, date, date2, properties, list, bool.booleanValue());
    }

    public InstancesResult suspendInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.SUSPEND, entity, date, date2, properties, list);
    }

    public InstancesResult resumeInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.RESUME, entity, date, date2, properties, list);
    }

    public InstancesResult getStatus(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.STATUS, entity, date, date2, null, list);
    }

    public InstancesSummaryResult getSummary(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        return doSummaryJobAction(entity, date, date2, null, list);
    }

    public InstancesResult getInstanceParams(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.PARAMS, entity, date, date2, null, list);
    }

    public boolean isNotificationEnabled(String str, String str2) throws FalconException {
        OozieClient oozieClient = OozieClientFactory.get(str);
        try {
            JMSConnectionInfo jMSConnectionInfo = oozieClient.getJMSConnectionInfo();
            if (jMSConnectionInfo == null || jMSConnectionInfo.getJNDIProperties().isEmpty()) {
                return false;
            }
            return StartupProperties.get().getProperty("entity.topic", "FALCON.ENTITY.TOPIC").equals(oozieClient.getJMSTopicName(str2));
        } catch (OozieClientException e) {
            LOG.debug("Error while retrieving JMS connection info", e);
            return false;
        }
    }

    private WorkflowJob getWorkflowInfo(String str, String str2) throws FalconException {
        try {
            return OozieClientFactory.get(str).getJobInfo(str2);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    /* JADX WARN: Can't wrap try/catch for region: R(9:12|(2:14|(2:28|24))(1:30)|19|20|21|22|23|24|10) */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x010c, code lost:
    
        r31 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x010e, code lost:
    
        org.apache.falcon.workflow.engine.OozieWorkflowEngine.LOG.warn("Unable to perform action {} on cluster", r9, r31);
        r0.status = org.apache.falcon.resource.InstancesResult.WorkflowStatus.ERROR;
        r19 = org.apache.falcon.resource.APIResult.Status.PARTIAL;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.falcon.resource.InstancesResult doJobAction(org.apache.falcon.workflow.engine.OozieWorkflowEngine.JobAction r9, org.apache.falcon.entity.v0.Entity r10, java.util.Date r11, java.util.Date r12, java.util.Properties r13, java.util.List<org.apache.falcon.LifeCycle> r14, boolean r15) throws org.apache.falcon.FalconException {
        /*
            Method dump skipped, instructions count: 386
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.falcon.workflow.engine.OozieWorkflowEngine.doJobAction(org.apache.falcon.workflow.engine.OozieWorkflowEngine$JobAction, org.apache.falcon.entity.v0.Entity, java.util.Date, java.util.Date, java.util.Properties, java.util.List, boolean):org.apache.falcon.resource.InstancesResult");
    }

    private InstancesResult doJobAction(JobAction jobAction, Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(jobAction, entity, date, date2, properties, list, false);
    }

    private InstancesSummaryResult doSummaryJobAction(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        Map<String, List<BundleJob>> findBundles = findBundles(entity);
        ArrayList arrayList = new ArrayList();
        List<String> includedClusters = getIncludedClusters(properties, FALCON_INSTANCE_ACTION_CLUSTERS);
        for (Map.Entry<String, List<BundleJob>> entry : findBundles.entrySet()) {
            HashMap hashMap = new HashMap();
            String key = entry.getKey();
            if (includedClusters.size() == 0 || includedClusters.contains(key)) {
                List<BundleJob> value = entry.getValue();
                OozieClient oozieClient = OozieClientFactory.get(key);
                List<CoordinatorJob> applicableCoords = getApplicableCoords(oozieClient, date, date2, value, list);
                long j = 0;
                for (int i = 0; i < applicableCoords.size(); i++) {
                    CoordinatorJob coordinatorJob = applicableCoords.get(i);
                    Frequency createFrequency = createFrequency(String.valueOf(coordinatorJob.getFrequency()), coordinatorJob.getTimeUnit());
                    TimeZone timeZone = EntityUtil.getTimeZone(coordinatorJob.getTimeZone());
                    Date nextStartTime = EntityUtil.getNextStartTime(coordinatorJob.getStartTime(), createFrequency, timeZone, date);
                    Date lastActionTime = (coordinatorJob.getLastActionTime() == null || !coordinatorJob.getLastActionTime().before(date2)) ? date2 : coordinatorJob.getLastActionTime();
                    boolean z = i == 0;
                    int instanceSequence = EntityUtil.getInstanceSequence(coordinatorJob.getStartTime(), createFrequency, timeZone, nextStartTime);
                    int instanceSequence2 = EntityUtil.getInstanceSequence(coordinatorJob.getStartTime(), createFrequency, timeZone, lastActionTime);
                    int instanceSequence3 = EntityUtil.getInstanceSequence(coordinatorJob.getStartTime(), createFrequency, timeZone, date2);
                    if (instanceSequence2 >= instanceSequence) {
                        if (z && instanceSequence3 != instanceSequence2) {
                            j = instanceSequence3 - instanceSequence2;
                        }
                        try {
                            CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordinatorJob.getId(), (String) null, instanceSequence, instanceSequence2 - instanceSequence);
                            if (coordJobInfo != null) {
                                updateInstanceSummary(coordJobInfo, hashMap);
                            }
                        } catch (OozieClientException e) {
                            LOG.debug("Unable to get details for coordinator {}", coordinatorJob.getId(), e);
                            throw new FalconException(e);
                        }
                    }
                }
                if (j > 0) {
                    hashMap.put("UNSCHEDULED", Long.valueOf(j));
                }
                arrayList.add(new InstancesSummaryResult.InstanceSummary(key, hashMap));
            }
        }
        InstancesSummaryResult instancesSummaryResult = new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
        instancesSummaryResult.setInstancesSummary((InstancesSummaryResult.InstanceSummary[]) arrayList.toArray(new InstancesSummaryResult.InstanceSummary[arrayList.size()]));
        return instancesSummaryResult;
    }

    private void populateInstanceActions(String str, WorkflowJob workflowJob, InstancesResult.Instance instance) throws FalconException {
        ArrayList arrayList = new ArrayList();
        for (WorkflowAction workflowAction : workflowJob.getActions()) {
            if (workflowAction.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(workflowAction.getExternalId())) {
                for (WorkflowAction workflowAction2 : getWorkflowInfo(str, workflowAction.getExternalId()).getActions()) {
                    if (!workflowAction2.getType().startsWith(":")) {
                        arrayList.add(new InstancesResult.InstanceAction(workflowAction2.getName(), workflowAction2.getExternalStatus(), workflowAction2.getConsoleUrl()));
                    }
                }
            } else if (!workflowAction.getType().startsWith(":")) {
                if (PARENT_WF_ACTION_NAMES.contains(workflowAction.getName()) && !Job.Status.SUCCEEDED.toString().equals(workflowAction.getExternalStatus())) {
                    arrayList.add(new InstancesResult.InstanceAction(workflowAction.getName(), workflowAction.getExternalStatus(), workflowAction.getConsoleUrl()));
                } else if (!PARENT_WF_ACTION_NAMES.contains(workflowAction.getName()) && !StringUtils.equals(workflowAction.getExternalId(), "-")) {
                    arrayList.add(new InstancesResult.InstanceAction(workflowAction.getName(), workflowAction.getExternalStatus(), workflowAction.getConsoleUrl()));
                }
            }
        }
        instance.actions = (InstancesResult.InstanceAction[]) arrayList.toArray(new InstancesResult.InstanceAction[arrayList.size()]);
    }

    private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob workflowJob) {
        Configuration configuration = new Configuration(false);
        configuration.addResource(new ByteArrayInputStream(workflowJob.getConf().getBytes()));
        InstancesResult.KeyValuePair[] keyValuePairArr = new InstancesResult.KeyValuePair[configuration.size()];
        int i = 0;
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            int i2 = i;
            i++;
            keyValuePairArr[i2] = new InstancesResult.KeyValuePair((String) entry.getKey(), (String) entry.getValue());
        }
        return keyValuePairArr;
    }

    private void updateInstanceSummary(CoordinatorJob coordinatorJob, Map<String, Long> map) {
        for (CoordinatorAction coordinatorAction : coordinatorJob.getActions()) {
            if (map.containsKey(coordinatorAction.getStatus().name())) {
                map.put(coordinatorAction.getStatus().name(), Long.valueOf(map.get(coordinatorAction.getStatus().name()).longValue() + 1));
            } else {
                map.put(coordinatorAction.getStatus().name(), 1L);
            }
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:5:0x005f. Please report as an issue. */
    private void performAction(String str, JobAction jobAction, CoordinatorAction coordinatorAction, Properties properties, InstancesResult.Instance instance, boolean z) throws FalconException {
        WorkflowJob workflowJob = null;
        String name = coordinatorAction.getStatus().name();
        if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
            workflowJob = getWorkflowInfo(str, coordinatorAction.getExternalId());
            name = workflowJob.getStatus().name();
            instance.startTime = workflowJob.getStartTime();
            instance.endTime = workflowJob.getEndTime();
            instance.logFile = workflowJob.getConsoleUrl();
        }
        switch (jobAction) {
            case KILL:
                if (workflowJob != null && WF_KILL_PRECOND.contains(workflowJob.getStatus())) {
                    kill(str, workflowJob.getId());
                    name = Job.Status.KILLED.name();
                }
                try {
                    instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                    return;
                } catch (IllegalArgumentException e) {
                    LOG.error("Job status not defined in Instance status: {}", name);
                    instance.status = InstancesResult.WorkflowStatus.UNDEFINED;
                    return;
                }
            case SUSPEND:
                if (workflowJob != null && WF_SUSPEND_PRECOND.contains(workflowJob.getStatus())) {
                    suspend(str, workflowJob.getId());
                    name = Job.Status.SUSPENDED.name();
                }
                instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                return;
            case RESUME:
                if (workflowJob != null && WF_RESUME_PRECOND.contains(workflowJob.getStatus())) {
                    resume(str, workflowJob.getId());
                    name = Job.Status.RUNNING.name();
                }
                instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                return;
            case RERUN:
                if (workflowJob == null && COORD_RERUN_PRECOND.contains(coordinatorAction.getStatus())) {
                    reRunCoordAction(str, coordinatorAction);
                    name = Job.Status.RUNNING.name();
                } else if (workflowJob != null && WF_RERUN_PRECOND.contains(workflowJob.getStatus())) {
                    reRun(str, workflowJob.getId(), properties, z);
                    name = Job.Status.RUNNING.name();
                }
                instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                return;
            case STATUS:
                if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
                    populateInstanceActions(str, workflowJob, instance);
                }
                instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                return;
            case PARAMS:
                if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
                    instance.wfParams = getWFParams(workflowJob);
                }
                instance.status = InstancesResult.WorkflowStatus.valueOf(mapActionStatus(name));
                return;
            default:
                throw new IllegalArgumentException("Unhandled action " + jobAction);
        }
    }

    private void reRunCoordAction(String str, CoordinatorAction coordinatorAction) throws FalconException {
        try {
            OozieClientFactory.get(str).reRunCoord(coordinatorAction.getJobId(), "action", Integer.toString(coordinatorAction.getActionNumber()), true, true);
            assertCoordActionStatus(str, coordinatorAction.getId(), CoordinatorAction.Status.RUNNING, CoordinatorAction.Status.WAITING, CoordinatorAction.Status.READY);
            LOG.info("Rerun job {} on cluster {}", coordinatorAction.getId(), str);
        } catch (Exception e) {
            LOG.error("Unable to rerun workflows", e);
            throw new FalconException(e);
        }
    }

    private void assertCoordActionStatus(String str, String str2, CoordinatorAction.Status... statusArr) throws FalconException, OozieClientException {
        OozieClient oozieClient = OozieClientFactory.get(str);
        CoordinatorAction coordActionInfo = oozieClient.getCoordActionInfo(str2);
        for (int i = 0; i < 3; i++) {
            for (CoordinatorAction.Status status : statusArr) {
                if (status.equals(coordActionInfo.getStatus())) {
                    return;
                }
            }
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            coordActionInfo = oozieClient.getCoordActionInfo(str2);
        }
        throw new FalconException("For Job" + str2 + ", actual statuses: " + coordActionInfo + ", expected statuses: " + Arrays.toString(statusArr));
    }

    private String getSourceCluster(String str, CoordinatorAction coordinatorAction, Entity entity) throws FalconException {
        try {
            return EntityUtil.getWorkflowNameSuffix(OozieClientFactory.get(str).getCoordJobInfo(coordinatorAction.getJobId()).getAppName(), entity);
        } catch (OozieClientException e) {
            throw new FalconException("Unable to get oozie job id:" + e);
        }
    }

    private List<String> getIncludedClusters(Properties properties, String str) {
        String property = properties == null ? "" : properties.getProperty(str, "");
        ArrayList arrayList = new ArrayList();
        for (String str2 : property.split(",")) {
            if (StringUtils.isNotEmpty(str2)) {
                arrayList.add(str2.trim());
            }
        }
        return arrayList;
    }

    private String mapActionStatus(String str) {
        return CoordinatorAction.Status.READY.toString().equals(str) ? InstancesResult.WorkflowStatus.READY.name() : (CoordinatorAction.Status.WAITING.toString().equals(str) || CoordinatorAction.Status.SUBMITTED.toString().equals(str)) ? InstancesResult.WorkflowStatus.WAITING.name() : CoordinatorAction.Status.IGNORED.toString().equals(str) ? InstancesResult.WorkflowStatus.KILLED.name() : CoordinatorAction.Status.TIMEDOUT.toString().equals(str) ? InstancesResult.WorkflowStatus.FAILED.name() : WorkflowJob.Status.PREP.toString().equals(str) ? InstancesResult.WorkflowStatus.RUNNING.name() : str;
    }

    protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        Map<String, List<BundleJob>> findBundles = findBundles(entity);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<BundleJob>> entry : findBundles.entrySet()) {
            String key = entry.getKey();
            List<BundleJob> value = entry.getValue();
            OozieClient oozieClient = OozieClientFactory.get(key);
            List<CoordinatorJob> applicableCoords = getApplicableCoords(oozieClient, date, date2, value, list);
            ArrayList arrayList = new ArrayList();
            int parseInt = Integer.parseInt(RuntimeProperties.get().getProperty("retention.instances.displaycount", "2"));
            int i = 0;
            for (CoordinatorJob coordinatorJob : applicableCoords) {
                Date nextMaterializedTime = coordinatorJob.getNextMaterializedTime();
                if (nextMaterializedTime != null) {
                    boolean isRetentionCoord = isRetentionCoord(coordinatorJob);
                    Frequency createFrequency = createFrequency(String.valueOf(coordinatorJob.getFrequency()), coordinatorJob.getTimeUnit());
                    TimeZone timeZone = EntityUtil.getTimeZone(coordinatorJob.getTimeZone());
                    Date date3 = (nextMaterializedTime.before(date2) || isRetentionCoord) ? nextMaterializedTime : date2;
                    Calendar calendar = Calendar.getInstance(EntityUtil.getTimeZone(coordinatorJob.getTimeZone()));
                    calendar.setTime(EntityUtil.getNextStartTime(coordinatorJob.getStartTime(), createFrequency, timeZone, date3));
                    calendar.add(createFrequency.getTimeUnit().getCalendarUnit(), -Integer.parseInt(coordinatorJob.getFrequency()));
                    while (date.compareTo(calendar.getTime()) <= 0) {
                        if (isRetentionCoord) {
                            if (i >= parseInt) {
                                break;
                            }
                            i++;
                        }
                        addCoordAction(oozieClient, arrayList, coordinatorJob.getId() + "@" + EntityUtil.getInstanceSequence(coordinatorJob.getStartTime(), createFrequency, timeZone, calendar.getTime()));
                        calendar.add(createFrequency.getTimeUnit().getCalendarUnit(), -Integer.parseInt(coordinatorJob.getFrequency()));
                    }
                }
            }
            hashMap.put(key, arrayList);
        }
        return hashMap;
    }

    private boolean isRetentionCoord(CoordinatorJob coordinatorJob) {
        return coordinatorJob.getAppName().contains(LifeCycle.EVICTION.getTag().name());
    }

    private void addCoordAction(OozieClient oozieClient, List<CoordinatorAction> list, String str) {
        CoordinatorAction coordinatorAction = null;
        try {
            coordinatorAction = oozieClient.getCoordActionInfo(str);
        } catch (OozieClientException e) {
            LOG.debug("Unable to get action for " + str + " " + e.getMessage());
        }
        if (coordinatorAction != null) {
            list.add(coordinatorAction);
        }
    }

    private Frequency createFrequency(String str, CoordinatorJob.Timeunit timeunit) {
        return new Frequency(str, OozieTimeUnit.valueOf(timeunit.name()).getFalconTimeUnit());
    }

    private List<CoordinatorJob> getApplicableCoords(OozieClient oozieClient, Date date, Date date2, List<BundleJob> list, List<LifeCycle> list2) throws FalconException {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<BundleJob> it = list.iterator();
            while (it.hasNext()) {
                for (CoordinatorJob coordinatorJob : oozieClient.getBundleJobInfo(it.next().getId()).getCoordinators()) {
                    if (coordinatorJob.getStatus() != Job.Status.PREP && isCoordApplicable(coordinatorJob.getAppName(), list2)) {
                        if (date2.compareTo(coordinatorJob.getStartTime()) > 0 && date.compareTo(coordinatorJob.getEndTime()) < 0) {
                            arrayList.add(coordinatorJob);
                        }
                    }
                }
            }
            sortDescByStartTime(arrayList);
            return arrayList;
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private boolean isCoordApplicable(String str, List<LifeCycle> list) {
        Iterator<LifeCycle> it = list.iterator();
        while (it.hasNext()) {
            if (str.contains(it.next().getTag().name())) {
                return true;
            }
        }
        return false;
    }

    protected void sortDescByStartTime(List<CoordinatorJob> list) {
        Collections.sort(list, new Comparator<CoordinatorJob>() { // from class: org.apache.falcon.workflow.engine.OozieWorkflowEngine.2
            @Override // java.util.Comparator
            public int compare(CoordinatorJob coordinatorJob, CoordinatorJob coordinatorJob2) {
                return coordinatorJob2.getStartTime().compareTo(coordinatorJob.getStartTime());
            }
        });
    }

    private boolean canUpdateBundle(Entity entity, Entity entity2) throws FalconException {
        return EntityUtil.equals(entity, entity2, BUNDLE_UPDATEABLE_PROPS);
    }

    public String update(Entity entity, Entity entity2, String str, Boolean bool) throws FalconException {
        BundleJob findLatestBundle = findLatestBundle(entity, str);
        boolean z = false;
        if (findLatestBundle != MISSING) {
            z = UpdateHelper.isEntityUpdated(entity, entity2, str, new Path(findLatestBundle.getAppPath()));
        }
        Cluster cluster = (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str);
        StringBuilder sb = new StringBuilder();
        if (findLatestBundle != MISSING && z) {
            LOG.info("Updating entity through Workflow Engine {}", entity2.toShortString());
            Date endTime = EntityUtil.getEndTime(entity2, str);
            if (endTime.before(DateUtil.now())) {
                throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(endTime) + " is before current time. Entity can't be updated. Use remove and add");
            }
            LOG.debug("Updating for cluster: {}, bundle: {}", str, findLatestBundle.getId());
            if (canUpdateBundle(entity, entity2)) {
                LOG.info("Change operation is adequate! : {}, bundle: {}", str, findLatestBundle.getId());
                updateCoords(str, findLatestBundle, EntityUtil.getParallel(entity2), EntityUtil.getEndTime(entity2, str), entity2);
                return getUpdateString(entity2, new Date(), findLatestBundle, findLatestBundle);
            }
            LOG.debug("Going to update! : {} for cluster {}, bundle: {}", new Object[]{entity2.toShortString(), str, findLatestBundle.getId()});
            sb.append(updateInternal(entity, entity2, cluster, findLatestBundle, CurrentUser.getUser(), bool)).append("\n");
            LOG.info("Entity update complete: {} for cluster {}, bundle: {}", new Object[]{entity2.toShortString(), str, findLatestBundle.getId()});
        }
        sb.append(updateDependents(cluster, entity, entity2, bool));
        return sb.toString();
    }

    public String touch(Entity entity, String str, Boolean bool) throws FalconException {
        BundleJob findLatestBundle = findLatestBundle(entity, str);
        Cluster cluster = (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str);
        StringBuilder sb = new StringBuilder();
        if (findLatestBundle != MISSING) {
            LOG.info("Updating entity {} for cluster: {}, bundle: {}", new Object[]{entity.toShortString(), str, findLatestBundle.getId()});
            sb.append(updateInternal(entity, entity, cluster, findLatestBundle, CurrentUser.getUser(), bool)).append("\n");
            LOG.info("Entity update complete: {} for cluster {}, bundle: {}", new Object[]{entity.toShortString(), str, findLatestBundle.getId()});
        }
        return sb.toString();
    }

    private String getUpdateString(Entity entity, Date date, BundleJob bundleJob, BundleJob bundleJob2) {
        StringBuilder sb = new StringBuilder();
        sb.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date));
        if (StringUtils.isNotEmpty(bundleJob.getId())) {
            sb.append(". Old bundle id: " + bundleJob.getId());
        }
        sb.append(". Old coordinator id: ");
        ArrayList arrayList = new ArrayList();
        Iterator it = bundleJob.getCoordinators().iterator();
        while (it.hasNext()) {
            arrayList.add(((CoordinatorJob) it.next()).getId());
        }
        sb.append(StringUtils.join(arrayList, ','));
        if (bundleJob2 != null) {
            arrayList.clear();
            Iterator it2 = bundleJob2.getCoordinators().iterator();
            while (it2.hasNext()) {
                arrayList.add(((CoordinatorJob) it2.next()).getId());
            }
            if (arrayList.isEmpty()) {
                sb.append(". New bundle id: ");
                sb.append(bundleJob2.getId());
            } else {
                sb.append(". New coordinator id: ");
                sb.append(StringUtils.join(arrayList, ','));
            }
        }
        return sb.toString();
    }

    private String updateDependents(Cluster cluster, Entity entity, Entity entity2, Boolean bool) throws FalconException {
        Set<Entity> dependents = EntityGraph.get().getDependents(entity);
        StringBuilder sb = new StringBuilder();
        for (Entity entity3 : dependents) {
            if (entity3.getEntityType() == EntityType.PROCESS) {
                LOG.info("Dependent entity {} need to be updated", entity3.toShortString());
                if (UpdateHelper.shouldUpdate(entity, entity2, entity3, cluster.getName())) {
                    BundleJob findLatestBundle = findLatestBundle(entity3, cluster.getName());
                    if (findLatestBundle == MISSING) {
                        LOG.info("Dependent entity {} is not scheduled", entity3.getName());
                    } else {
                        LOG.info("Triggering update for {}, {}", cluster, findLatestBundle.getId());
                        sb.append(updateInternal(entity3, entity3, cluster, findLatestBundle, findLatestBundle.getUser(), bool)).append("\n");
                        LOG.info("Entity update complete: {} for cluster {}, bundle: {}", new Object[]{entity3.toShortString(), cluster, findLatestBundle.getId()});
                    }
                }
            }
        }
        LOG.info("All dependent entities updated for: {}", entity.toShortString());
        return sb.toString();
    }

    private Date getCoordLastActionTime(CoordinatorJob coordinatorJob) {
        if (coordinatorJob.getNextMaterializedTime() == null) {
            return null;
        }
        Calendar calendar = Calendar.getInstance(EntityUtil.getTimeZone(coordinatorJob.getTimeZone()));
        calendar.setTime(coordinatorJob.getLastActionTime());
        Frequency createFrequency = createFrequency(String.valueOf(coordinatorJob.getFrequency()), coordinatorJob.getTimeUnit());
        calendar.add(createFrequency.getTimeUnit().getCalendarUnit(), -createFrequency.getFrequencyAsInt());
        return calendar.getTime();
    }

    private void updateCoords(String str, BundleJob bundleJob, int i, Date date, Entity entity) throws FalconException {
        if (date.compareTo(DateUtil.now()) <= 0) {
            throw new FalconException("End time " + SchemaHelper.formatDateUTC(date) + " can't be in the past");
        }
        if (bundleJob.getCoordinators() == null || bundleJob.getCoordinators().isEmpty()) {
            throw new FalconException("Invalid state. Oozie coords are still not created. Try again later");
        }
        for (CoordinatorJob coordinatorJob : bundleJob.getCoordinators()) {
            Frequency delay = entity.getEntityType().equals(EntityType.FEED) ? getDelay((Feed) entity, coordinatorJob) : null;
            date = delay == null ? date : EntityUtil.getNextStartTime(coordinatorJob.getStartTime(), delay, EntityUtil.getTimeZone(entity), date);
            LOG.debug("Updating endtime of coord {} to {} on cluster {}", new Object[]{coordinatorJob.getId(), SchemaHelper.formatDateUTC(date), str});
            Date coordLastActionTime = getCoordLastActionTime(coordinatorJob);
            if (coordLastActionTime == null) {
                LOG.info("Nothing is materialized for this coord: {}", coordinatorJob.getId());
                if (date.compareTo(coordinatorJob.getStartTime()) <= 0) {
                    LOG.info("Setting end time to START TIME {}", SchemaHelper.formatDateUTC(coordinatorJob.getStartTime()));
                    change(str, coordinatorJob.getId(), i, coordinatorJob.getStartTime(), null);
                } else {
                    LOG.info("Setting end time to START TIME {}", SchemaHelper.formatDateUTC(date));
                    change(str, coordinatorJob.getId(), i, date, null);
                }
            } else {
                LOG.info("Actions have materialized for this coord: {}, last action {}", coordinatorJob.getId(), SchemaHelper.formatDateUTC(coordLastActionTime));
                if (!date.after(coordLastActionTime)) {
                    Date offsetTime = DateUtil.offsetTime(date, -60);
                    LOG.info("Setting pause time on coord: {} to {}", coordinatorJob.getId(), SchemaHelper.formatDateUTC(offsetTime));
                    change(str, coordinatorJob.getId(), i, null, SchemaHelper.formatDateUTC(offsetTime));
                }
                change(str, coordinatorJob.getId(), i, date, null);
            }
        }
    }

    private Frequency getDelay(Feed feed, CoordinatorJob coordinatorJob) {
        for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
            if (coordinatorJob.getAppName().contains(cluster.getName()) && coordinatorJob.getAppName().contains("REPLICATION") && cluster.getDelay() != null) {
                return cluster.getDelay();
            }
        }
        return null;
    }

    private String updateInternal(Entity entity, Entity entity2, Cluster cluster, BundleJob bundleJob, String str, Boolean bool) throws FalconException {
        String name = cluster.getName();
        Date effectiveTime = getEffectiveTime(cluster, entity2);
        LOG.info("Effective time " + effectiveTime);
        dryRunForUpdate(cluster, entity2, effectiveTime, bool);
        boolean contains = BUNDLE_SUSPENDED_STATUS.contains(bundleJob.getStatus());
        updateCoords(name, bundleJob, EntityUtil.getParallel(entity), effectiveTime, entity2);
        String scheduleForUpdate = scheduleForUpdate(entity2, cluster, effectiveTime, str);
        BundleJob bundleJob2 = null;
        if (scheduleForUpdate != null) {
            bundleJob2 = getBundleInfo(name, scheduleForUpdate);
        }
        if (contains) {
            doBundleAction(entity2, BundleAction.SUSPEND, cluster.getName());
        }
        return getUpdateString(entity2, effectiveTime, bundleJob, bundleJob2);
    }

    private Date getEffectiveTime(Cluster cluster, Entity entity) {
        return EntityUtil.getNextStartTime(entity, cluster, DateUtil.offsetTime(DateUtil.now(), 180));
    }

    private void dryRunForUpdate(Cluster cluster, Entity entity, Date date, Boolean bool) throws FalconException {
        Entity copy = entity.copy();
        EntityUtil.setStartDate(copy, cluster.getName(), date);
        try {
            dryRun(copy, cluster.getName(), bool);
        } catch (FalconException e) {
            throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e);
        }
    }

    private String scheduleForUpdate(Entity entity, Cluster cluster, Date date, String str) throws FalconException {
        Entity copy = entity.copy();
        String user = CurrentUser.getUser();
        switchUser(str);
        try {
            EntityUtil.setStartDate(copy, cluster.getName(), date);
            Properties build = OozieEntityBuilder.get(copy).build(cluster, EntityUtil.getNewStagingPath(cluster, copy));
            if (build == null) {
                LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
                switchUser(user);
                return null;
            }
            LOG.info("Scheduling {} on cluster {} with props {}", new Object[]{entity.toShortString(), cluster.getName(), build});
            String scheduleEntity = scheduleEntity(cluster.getName(), build, entity);
            switchUser(user);
            return scheduleEntity;
        } catch (Throwable th) {
            switchUser(user);
            throw th;
        }
    }

    private void switchUser(String str) {
        if (CurrentUser.getUser().equals(str)) {
            return;
        }
        CurrentUser.authenticate(str);
    }

    private BundleJob getBundleInfo(String str, String str2) throws FalconException {
        try {
            return OozieClientFactory.get(str).getBundleJobInfo(str2);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private List<WorkflowJob> getRunningWorkflows(String str, List<String> list) throws FalconException {
        StringBuilder sb = new StringBuilder();
        sb.append("status").append('=').append(Job.Status.RUNNING.name());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            sb.append(';').append("name").append('=').append(it.next());
        }
        try {
            return OozieClientFactory.get(str).getJobsInfo(sb.toString(), 1, 1000);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    public void reRun(String str, String str2, Properties properties, boolean z) throws FalconException {
        OozieClient oozieClient = OozieClientFactory.get(str);
        try {
            WorkflowJob jobInfo = oozieClient.getJobInfo(str2);
            if (properties == null) {
                properties = new Properties();
            }
            if (!properties.containsKey("oozie.wf.rerun.failnodes") && !properties.containsKey("oozie.wf.rerun.skip.nodes")) {
                properties.put("oozie.wf.rerun.failnodes", String.valueOf(!z));
            }
            Properties properties2 = OozieUtils.toProperties(jobInfo.getConf());
            properties2.putAll(properties);
            properties2.remove("oozie.coord.application.path");
            properties2.remove("oozie.bundle.application.path");
            if (properties2.containsKey("oozie.wf.rerun.failnodes") && properties2.containsKey("oozie.wf.rerun.skip.nodes")) {
                LOG.warn("Both oozie.wf.rerun.skip.nodes and oozie.wf.rerun.failnodes are present in workflow params removingoozie.wf.rerun.skip.nodes");
                properties2.remove("oozie.wf.rerun.skip.nodes");
            }
            oozieClient.reRun(str2, properties2);
            assertStatus(str, str2, Job.Status.RUNNING);
            LOG.info("Rerun job {} on cluster {}", str2, str);
        } catch (Exception e) {
            LOG.error("Unable to rerun workflows", e);
            throw new FalconException(e);
        }
    }

    private void assertStatus(String str, String str2, Job.Status... statusArr) throws FalconException {
        String str3 = null;
        try {
            int parseInt = Integer.parseInt(RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30"));
            for (int i = 0; i < parseInt; i++) {
                str3 = getWorkflowStatus(str, str2);
                if (statusEquals(str3, statusArr)) {
                    return;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            throw new FalconException("For Job" + str2 + ", actual statuses: " + str3 + ", expected statuses: " + Arrays.toString(statusArr));
        } catch (NumberFormatException e2) {
            throw new FalconException("Invalid value provided for runtime property \"workflow.status.retry.count\". Please provide an integer value.");
        }
    }

    private boolean statusEquals(String str, Job.Status... statusArr) {
        for (Job.Status status : statusArr) {
            if (str.equals(status.name())) {
                return true;
            }
        }
        return false;
    }

    public String getWorkflowStatus(String str, String str2) throws FalconException {
        OozieClient oozieClient = OozieClientFactory.get(str);
        try {
            if (str2.endsWith("-W")) {
                return oozieClient.getJobInfo(str2).getStatus().name();
            }
            if (str2.endsWith("-C")) {
                return oozieClient.getCoordJobInfo(str2).getStatus().name();
            }
            if (str2.endsWith("-B")) {
                return oozieClient.getBundleJobInfo(str2).getStatus().name();
            }
            throw new IllegalArgumentException("Unhandled jobs id: " + str2);
        } catch (Exception e) {
            LOG.error("Unable to get status of workflows", e);
            throw new FalconException(e);
        }
    }

    private String scheduleEntity(String str, Properties properties, Entity entity) throws FalconException {
        Iterator it = this.listeners.iterator();
        while (it.hasNext()) {
            ((WorkflowEngineActionListener) it.next()).beforeSchedule(entity, str);
        }
        String run = run(str, properties);
        Iterator it2 = this.listeners.iterator();
        while (it2.hasNext()) {
            ((WorkflowEngineActionListener) it2.next()).afterSchedule(entity, str);
        }
        return run;
    }

    private String run(String str, Properties properties) throws FalconException {
        try {
            LOG.info("Scheduling on cluster {} with properties {}", str, properties);
            properties.setProperty("user.name", CurrentUser.getUser());
            String run = OozieClientFactory.get(str).run(properties);
            LOG.info("Submitted {} on cluster {}", run, str);
            return run;
        } catch (OozieClientException e) {
            LOG.error("Unable to schedule workflows", e);
            throw new FalconException("Unable to schedule workflows", e);
        }
    }

    private void suspend(String str, String str2) throws FalconException {
        try {
            OozieClientFactory.get(str).suspend(str2);
            assertStatus(str, str2, Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED);
            LOG.info("Suspended job {} on cluster {}", str2, str);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private void resume(String str, String str2) throws FalconException {
        try {
            OozieClientFactory.get(str).resume(str2);
            assertStatus(str, str2, Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED, Job.Status.FAILED, Job.Status.KILLED);
            LOG.info("Resumed job {} on cluster {}", str2, str);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private void kill(String str, String str2) throws FalconException {
        try {
            OozieClientFactory.get(str).kill(str2);
            assertStatus(str, str2, Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
            LOG.info("Killed job {} on cluster {}", str2, str);
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private void change(String str, String str2, String str3) throws FalconException {
        try {
            OozieClientFactory.get(str).change(str2, str3);
            LOG.info("Changed bundle/coord {}: {} on cluster {}", new Object[]{str2, str3, str});
        } catch (OozieClientException e) {
            throw new FalconException(e);
        }
    }

    private void change(String str, String str2, int i, Date date, String str3) throws FalconException {
        StringBuilder sb = new StringBuilder();
        sb.append("concurrency").append("=").append(i).append(";");
        if (date != null) {
            sb.append("endtime").append("=").append(SchemaHelper.formatDateUTC(date)).append(";");
        }
        if (str3 != null) {
            sb.append("pausetime").append("=").append(str3);
        }
        String sb2 = sb.toString();
        if (sb.toString().endsWith(";")) {
            sb2 = sb.substring(0, sb2.length() - 1);
        }
        change(str, str2, sb2);
        try {
            OozieClient oozieClient = OozieClientFactory.get(str);
            CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(str2);
            for (int i2 = 0; i2 < 3; i2++) {
                Date parseDateUTC = StringUtils.isEmpty(str3) ? null : SchemaHelper.parseDateUTC(str3);
                if (coordJobInfo.getConcurrency() == i && ((date == null || coordJobInfo.getEndTime().equals(date)) && (parseDateUTC == null || parseDateUTC.equals(coordJobInfo.getPauseTime())))) {
                    return;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                coordJobInfo = oozieClient.getCoordJobInfo(str2);
            }
            LOG.error("Failed to change coordinator. Current value {}, {}, {}", new Object[]{Integer.valueOf(coordJobInfo.getConcurrency()), SchemaHelper.formatDateUTC(coordJobInfo.getEndTime()), SchemaHelper.formatDateUTC(coordJobInfo.getPauseTime())});
            throw new FalconException("Failed to change coordinator " + str2 + " with change value " + sb2);
        } catch (OozieClientException e2) {
            throw new FalconException(e2);
        }
    }

    public Properties getWorkflowProperties(String str, String str2) throws FalconException {
        try {
            return OozieUtils.toProperties(OozieClientFactory.get(str).getJobInfo(str2).getConf());
        } catch (Exception e) {
            throw new FalconException(e);
        }
    }

    public InstancesResult getJobDetails(String str, String str2) throws FalconException {
        InstancesResult.Instance[] instanceArr = new InstancesResult.Instance[1];
        InstancesResult.Instance instance = new InstancesResult.Instance();
        try {
            WorkflowJob jobInfo = OozieClientFactory.get(str).getJobInfo(str2);
            instance.startTime = jobInfo.getStartTime();
            if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) {
                instance.endTime = new Date();
            } else {
                instance.endTime = jobInfo.getEndTime();
            }
            instance.cluster = str;
            instance.runId = jobInfo.getRun();
            instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name());
            instance.wfParams = getWFParams(jobInfo);
            instanceArr[0] = instance;
            InstancesResult instancesResult = new InstancesResult(APIResult.Status.SUCCEEDED, "Instance for workflow id:" + str2);
            instancesResult.setInstances(instanceArr);
            return instancesResult;
        } catch (Exception e) {
            throw new FalconException(e);
        }
    }

    public Boolean isWorkflowKilledByUser(String str, String str2) throws FalconException {
        try {
            OozieClient oozieClient = OozieClientFactory.get(str);
            List<WorkflowAction> actions = oozieClient.getJobInfo(str2).getActions();
            Iterator<WorkflowAction> it = actions.iterator();
            while (it.hasNext()) {
                if (StringUtils.isNotEmpty(it.next().getErrorCode())) {
                    return false;
                }
            }
            String userWorkflowAction = getUserWorkflowAction(actions);
            if (StringUtils.isNotBlank(userWorkflowAction)) {
                Iterator it2 = oozieClient.getJobInfo(userWorkflowAction).getActions().iterator();
                while (it2.hasNext()) {
                    if (StringUtils.isNotEmpty(((WorkflowAction) it2.next()).getErrorCode())) {
                        return false;
                    }
                }
            }
            return true;
        } catch (Exception e) {
            throw new FalconException(e);
        }
    }

    public String getName() {
        return "oozie";
    }

    private String getUserWorkflowAction(List<WorkflowAction> list) {
        for (WorkflowAction workflowAction : list) {
            if (StringUtils.equals(workflowAction.getName(), "user-action")) {
                return workflowAction.getExternalId();
            }
        }
        return null;
    }
}
