package org.apache.falcon.rerun.handler;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Properties;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.handler.LateRerunHandler;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.workflow.LateDataHandler;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.internal.AroundClosure;
import org.eclipse.jetty.util.component.AbstractLifeCycle;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/rerun/handler/LateRerunConsumer.class
 */
/* loaded from: input_file:WEB-INF/lib/falcon-rerun-0.8.jar:org/apache/falcon/rerun/handler/LateRerunConsumer.class */
public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEvent>>> extends AbstractRerunConsumer<LaterunEvent, T> {

    /* loaded from: input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/rerun/handler/LateRerunConsumer$AjcClosure1.class */
    public class AjcClosure1 extends AroundClosure {
        public AjcClosure1(Object[] objArr) {
            super(objArr);
        }

        @Override // org.aspectj.runtime.internal.AroundClosure
        public Object run(Object[] objArr) {
            Object[] objArr2 = this.state;
            return LateRerunConsumer.alertLateRerunFailed_aroundBody0((LateRerunConsumer) objArr2[0], (String) objArr2[1], (String) objArr2[2], (String) objArr2[3], (String) objArr2[4], (String) objArr2[5], (String) objArr2[6], (String) objArr2[7], (JoinPoint) objArr2[8]);
        }
    }

    public LateRerunConsumer(T t) {
        super(t);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.falcon.rerun.handler.AbstractRerunConsumer
    public void handleRerun(String str, String str2, LaterunEvent laterunEvent) {
        try {
            if (str2.equals(AbstractLifeCycle.RUNNING) || str2.equals("PREP") || str2.equals("SUSPENDED")) {
                LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as job status is running: {}", laterunEvent.getWfId());
                laterunEvent.setMsgInsertTime(System.currentTimeMillis());
                ((LateRerunHandler) this.handler).offerToQueue(laterunEvent);
                return;
            }
            String detectLate = detectLate(laterunEvent);
            if (detectLate.equals("")) {
                LOG.debug("No Late Data Detected, scheduling next late rerun for wf-id: {} at {}", laterunEvent.getWfId(), SchemaHelper.formatDateUTC(new Date()));
                ((LateRerunHandler) this.handler).handleRerun(str, laterunEvent.getEntityType(), laterunEvent.getEntityName(), laterunEvent.getInstance(), Integer.toString(laterunEvent.getRunId()), laterunEvent.getWfId(), laterunEvent.getWorkflowUser(), System.currentTimeMillis());
            } else {
                LOG.info("Late changes detected in the following feeds: {}", detectLate);
                ((LateRerunHandler) this.handler).getWfEngine().reRun(laterunEvent.getClusterName(), laterunEvent.getWfId(), null, true);
                LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}", laterunEvent.getWfId(), laterunEvent.getClusterName());
            }
        } catch (Exception e) {
            LOG.warn("Late Re-run failed for instance {}:{} after {}", laterunEvent.getEntityName(), laterunEvent.getInstance(), Long.valueOf(laterunEvent.getDelayInMilliSec()), e);
            GenericAlert.alertLateRerunFailed(laterunEvent.getEntityType(), laterunEvent.getEntityName(), laterunEvent.getInstance(), laterunEvent.getWfId(), laterunEvent.getWorkflowUser(), Integer.toString(laterunEvent.getRunId()), e.getMessage());
        }
    }

    public String detectLate(LaterunEvent laterunEvent) throws Exception {
        LateDataHandler lateDataHandler = new LateDataHandler();
        Properties workflowProperties = ((LateRerunHandler) this.handler).getWfEngine().getWorkflowProperties(laterunEvent.getClusterName(), laterunEvent.getWfId());
        String property = workflowProperties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
        String property2 = workflowProperties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
        String property3 = workflowProperties.getProperty(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName());
        Path lateLogPath = ((LateRerunHandler) this.handler).getLateLogPath(workflowProperties.getProperty(WorkflowExecutionArgs.LOG_DIR.getName()), workflowProperties.getProperty(WorkflowExecutionArgs.NOMINAL_TIME.getName()), workflowProperties.getProperty("srcClusterName"));
        Configuration configuration = LateRerunHandler.getConfiguration(workflowProperties.getProperty(AbstractWorkflowEngine.NAME_NODE));
        if (!HadoopClientFactory.get().createProxiedFileSystem(lateLogPath.toUri(), configuration).exists(lateLogPath)) {
            LOG.warn("Late log file: {} not found", lateLogPath);
            return "";
        }
        String[] split = property2.split("#");
        String[] split2 = property.split("#");
        String[] split3 = property3.split("#");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Entity entity = EntityUtil.getEntity(laterunEvent.getEntityType(), laterunEvent.getEntityName());
        if (EntityUtil.getLateProcess(entity) != null) {
            ArrayList arrayList = new ArrayList();
            Iterator<LateInput> it = EntityUtil.getLateProcess(entity).getLateInputs().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getInput());
            }
            for (int i = 0; i < split.length; i++) {
                if (arrayList.contains(split2[i])) {
                    linkedHashMap.put(split2[i], Long.valueOf(lateDataHandler.computeStorageMetric(split[i], split3[i], configuration)));
                }
            }
        } else {
            LOG.warn("Late process is not configured for entity: {} ({})", laterunEvent.getEntityType(), laterunEvent.getEntityName());
        }
        return lateDataHandler.detectChanges(lateLogPath, linkedHashMap, configuration);
    }
}
