package co.cask.directives.row;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.wrangler.api.Arguments;
import co.cask.wrangler.api.Directive;
import co.cask.wrangler.api.DirectiveExecutionException;
import co.cask.wrangler.api.DirectiveParseException;
import co.cask.wrangler.api.ExecutorContext;
import co.cask.wrangler.api.ReportErrorAndProceed;
import co.cask.wrangler.api.Row;
import co.cask.wrangler.api.annotations.Categories;
import co.cask.wrangler.api.parser.Expression;
import co.cask.wrangler.api.parser.Identifier;
import co.cask.wrangler.api.parser.Text;
import co.cask.wrangler.api.parser.TokenType;
import co.cask.wrangler.api.parser.UsageDefinition;
import co.cask.wrangler.expression.EL;
import co.cask.wrangler.expression.ELContext;
import co.cask.wrangler.expression.ELException;
import java.util.ArrayList;
import java.util.List;

@Name(SendToErrorAndContinue.NAME)
@Plugin(type = Directive.Type)
@Categories(categories = {"row", "data-quality"})
@Description("Send records that match condition to the error collector and continues processing.")
/* loaded from: input_file:co/cask/directives/row/SendToErrorAndContinue.class */
public class SendToErrorAndContinue implements Directive {
    public static final String NAME = "send-to-error-and-continue";
    private String condition;
    private final EL el = new EL(new EL.DefaultFunctions());
    private String metric = null;
    private String message = null;

    @Override // co.cask.wrangler.api.Directive
    public UsageDefinition define() {
        UsageDefinition.Builder builder = UsageDefinition.builder(NAME);
        builder.define("condition", TokenType.EXPRESSION);
        builder.define("metric", TokenType.IDENTIFIER, true);
        builder.define("message", TokenType.TEXT, true);
        return builder.build();
    }

    @Override // co.cask.wrangler.api.Executor
    public void initialize(Arguments arguments) throws DirectiveParseException {
        this.condition = ((Expression) arguments.value("condition")).value();
        try {
            this.el.compile(this.condition);
            if (arguments.contains("metric")) {
                this.metric = ((Identifier) arguments.value("metric")).value();
            }
            if (arguments.contains("message")) {
                this.message = ((Text) arguments.value("message")).value();
            }
        } catch (ELException e) {
            throw new DirectiveParseException(String.format("Invalid condition '%s'.", this.condition));
        }
    }

    @Override // co.cask.wrangler.api.Executor
    public void destroy() {
    }

    @Override // co.cask.wrangler.api.Executor
    public List<Row> execute(List<Row> list, ExecutorContext executorContext) throws DirectiveExecutionException, ReportErrorAndProceed {
        if (executorContext != null) {
            executorContext.getTransientStore().increment("dq_total", 1L);
        }
        ArrayList arrayList = new ArrayList();
        for (Row row : list) {
            ELContext eLContext = new ELContext(executorContext);
            eLContext.set("this", row);
            for (String str : this.el.variables()) {
                eLContext.set(str, row.getValue(str));
            }
            if (executorContext != null) {
                for (String str2 : executorContext.getTransientStore().getVariables()) {
                    eLContext.set(str2, executorContext.getTransientStore().get(str2));
                }
            }
            try {
                if (this.el.execute(eLContext).getBoolean().booleanValue()) {
                    if (this.metric != null && executorContext != null) {
                        executorContext.getMetrics().count(this.metric, 1);
                    }
                    if (this.message == null) {
                        this.message = this.condition;
                    }
                    if (executorContext != null) {
                        executorContext.getTransientStore().increment("dq_failure", 1L);
                    }
                    throw new ReportErrorAndProceed(this.message, 1);
                }
                arrayList.add(row);
            } catch (ELException e) {
                throw new DirectiveExecutionException(e.getMessage());
            }
        }
        return arrayList;
    }
}
