package org.apache.falcon.rerun.handler;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.handler.AbstractRerunHandler;
import org.apache.falcon.rerun.policy.ExpBackoffPolicy;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.security.CurrentUser;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.internal.AroundClosure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/rerun/handler/AbstractRerunConsumer.class
 */
/* loaded from: input_file:WEB-INF/lib/falcon-rerun-0.8.jar:org/apache/falcon/rerun/handler/AbstractRerunConsumer.class */
public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends AbstractRerunHandler<T, DelayedQueue<T>>> implements Runnable {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractRerunConsumer.class);
    protected M handler;

    /* loaded from: input_file:WEB-INF/lib/falcon-prism-0.8-classes.jar:org/apache/falcon/rerun/handler/AbstractRerunConsumer$AjcClosure1.class */
    public class AjcClosure1 extends AroundClosure {
        public AjcClosure1(Object[] objArr) {
            super(objArr);
        }

        @Override // org.aspectj.runtime.internal.AroundClosure
        public Object run(Object[] objArr) {
            Object[] objArr2 = this.state;
            return AbstractRerunConsumer.alertRerunConsumerFailed_aroundBody0((AbstractRerunConsumer) objArr2[0], (String) objArr2[1], (Exception) objArr2[2], (JoinPoint) objArr2[3]);
        }
    }

    public AbstractRerunConsumer(M m) {
        this.handler = m;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        int i = 1;
        ExpBackoffPolicy expBackoffPolicy = new ExpBackoffPolicy();
        Frequency frequency = new Frequency("minutes(1)");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    RerunEvent takeFromQueue = this.handler.takeFromQueue();
                    i = 1;
                    CurrentUser.authenticate(takeFromQueue.getWorkflowUser());
                    handleRerun(takeFromQueue.getClusterName(), this.handler.getWfEngine().getWorkflowStatus(takeFromQueue.getClusterName(), takeFromQueue.getWfId()), takeFromQueue);
                } catch (FalconException e) {
                    if (ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
                        LOG.info("Rerun handler daemon has been interrupted");
                        return;
                    }
                    LOG.error("Error while reading message from the queue", (Throwable) e);
                    GenericAlert.alertRerunConsumerFailed("Error while reading message from the queue: ", e);
                    Thread.sleep(expBackoffPolicy.getDelay(frequency, i));
                    this.handler.reconnect();
                    i++;
                }
            } catch (Throwable th) {
                LOG.error("Error in rerun consumer", th);
            }
        }
    }

    protected abstract void handleRerun(String str, String str2, T t);
}
