package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.api.annotation.Batch;
import co.cask.cdap.api.flow.flowlet.Flowlet;
import co.cask.cdap.api.flow.flowlet.InputContext;
import co.cask.cdap.app.queue.InputDatum;
import co.cask.cdap.internal.app.runtime.flow.ProcessMethod;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import java.lang.reflect.Method;
import java.util.Iterator;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/ReflectionProcessMethod.class */
public final class ReflectionProcessMethod<T> implements ProcessMethod<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ReflectionProcessMethod.class);
    private final Flowlet flowlet;
    private final Method method;
    private final boolean hasParam;
    private final boolean batch;
    private final boolean needsIterator;
    private final boolean needContext;
    private final int maxRetries;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/ReflectionProcessMethod$ReflectionProcessResult.class */
    public static final class ReflectionProcessResult<V> implements ProcessMethod.ProcessResult<V> {
        private final V event;
        private final Throwable cause;

        private ReflectionProcessResult(V v, Throwable th) {
            this.event = v;
            this.cause = th;
        }

        @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod.ProcessResult
        public V getEvent() {
            return this.event;
        }

        @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod.ProcessResult
        public boolean isSuccess() {
            return this.cause == null;
        }

        @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod.ProcessResult
        public Throwable getCause() {
            return this.cause;
        }
    }

    public static <T> ReflectionProcessMethod<T> create(Flowlet flowlet, Method method, int i) {
        return new ReflectionProcessMethod<>(flowlet, method, i);
    }

    private ReflectionProcessMethod(Flowlet flowlet, Method method, int i) {
        this.flowlet = flowlet;
        this.method = method;
        this.maxRetries = i;
        this.hasParam = method.getGenericParameterTypes().length > 0;
        this.batch = method.isAnnotationPresent(Batch.class);
        this.needsIterator = this.hasParam && TypeToken.of(method.getGenericParameterTypes()[0]).getRawType().equals(Iterator.class);
        this.needContext = method.getGenericParameterTypes().length == 2;
        if (this.method.isAccessible()) {
            return;
        }
        this.method.setAccessible(true);
    }

    @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod
    public boolean needsInput() {
        return this.hasParam;
    }

    @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod
    public int getMaxRetries() {
        return this.maxRetries;
    }

    @Override // co.cask.cdap.internal.app.runtime.flow.ProcessMethod
    public ProcessMethod.ProcessResult<T> invoke(InputDatum<T> inputDatum) {
        try {
            Preconditions.checkState(!this.hasParam || inputDatum.needProcess(), "Empty input provided to method that needs input.");
            InputContext inputContext = inputDatum.getInputContext();
            try {
                if (!this.hasParam) {
                    this.method.invoke(this.flowlet, new Object[0]);
                } else if (this.needsIterator) {
                    invoke(this.method, inputDatum.iterator(), inputContext);
                } else {
                    Iterator<T> it = inputDatum.iterator();
                    while (it.hasNext()) {
                        invoke(this.method, it.next(), inputContext);
                    }
                }
                return createResult(inputDatum, null);
            } catch (Throwable th) {
                return createResult(inputDatum, th.getCause());
            }
        } catch (Exception e) {
            LOG.error("Fail to process input: {}", this.method, e);
            throw Throwables.propagate(e);
        }
    }

    public String toString() {
        return this.flowlet.getClass() + "." + this.method.toString();
    }

    private void invoke(Method method, Object obj, InputContext inputContext) throws Exception {
        if (this.needContext) {
            method.invoke(this.flowlet, obj, inputContext);
        } else {
            method.invoke(this.flowlet, obj);
        }
    }

    private ProcessMethod.ProcessResult<T> createResult(InputDatum<T> inputDatum, Throwable th) {
        return new ReflectionProcessResult(this.hasParam ? this.batch ? inputDatum.iterator() : inputDatum.iterator().next() : null, th);
    }
}
