package co.cask.wrangler.executor;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.wrangler.api.DirectiveExecutionException;
import co.cask.wrangler.api.DirectiveLoadException;
import co.cask.wrangler.api.DirectiveNotFoundException;
import co.cask.wrangler.api.DirectiveParseException;
import co.cask.wrangler.api.ErrorRecord;
import co.cask.wrangler.api.ErrorRowException;
import co.cask.wrangler.api.Executor;
import co.cask.wrangler.api.ExecutorContext;
import co.cask.wrangler.api.RecipeException;
import co.cask.wrangler.api.RecipeParser;
import co.cask.wrangler.api.RecipePipeline;
import co.cask.wrangler.api.ReportErrorAndProceed;
import co.cask.wrangler.api.Row;
import co.cask.wrangler.utils.RecordConvertor;
import co.cask.wrangler.utils.RecordConvertorException;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/wrangler-core-3.2.0.jar:co/cask/wrangler/executor/RecipePipelineExecutor.class */
public final class RecipePipelineExecutor implements RecipePipeline<Row, StructuredRecord, ErrorRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(RecipePipelineExecutor.class);
    private ExecutorContext context;
    private List<Executor> directives;
    private final ErrorRecordCollector collector = new ErrorRecordCollector();
    private RecordConvertor convertor = new RecordConvertor();

    @Override // co.cask.wrangler.api.RecipePipeline
    public void initialize(RecipeParser recipeParser, ExecutorContext executorContext) throws RecipeException {
        this.context = executorContext;
        try {
            this.directives = recipeParser.parse();
        } catch (DirectiveLoadException | DirectiveNotFoundException e) {
            throw new RecipeException(e.getMessage(), e);
        } catch (DirectiveParseException e2) {
            throw new RecipeException(e2.getMessage());
        }
    }

    @Override // co.cask.wrangler.api.RecipePipeline
    public void destroy() {
        Iterator<Executor> it = this.directives.iterator();
        while (it.hasNext()) {
            try {
                it.next().destroy();
            } catch (Exception e) {
                LOG.warn(e.getMessage());
            } catch (Throwable th) {
                LOG.warn(th.getMessage());
            }
        }
    }

    @Override // co.cask.wrangler.api.RecipePipeline
    public List<StructuredRecord> execute(List<Row> list, Schema schema) throws RecipeException {
        try {
            return this.convertor.toStructureRecord(execute(list), schema);
        } catch (RecordConvertorException e) {
            throw new RecipeException("Problem converting into output record. Reason : " + e.getMessage());
        }
    }

    @Override // co.cask.wrangler.api.RecipePipeline
    public List<Row> execute(List<Row> list) throws RecipeException {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            this.collector.reset();
            for (int i = 0; i < list.size(); i++) {
                List<Row> subList = list.subList(i, i + 1);
                try {
                    Iterator<Executor> it = this.directives.iterator();
                    while (it.hasNext()) {
                        try {
                            subList = (List) it.next().execute(subList, this.context);
                        } catch (ReportErrorAndProceed e) {
                            this.collector.add(new ErrorRecord(subList.get(0), e.getMessage(), e.getCode()));
                        }
                        if (subList.size() < 1) {
                            break;
                        }
                    }
                    if (subList.size() > 0) {
                        newArrayList.addAll(subList);
                    }
                } catch (ErrorRowException e2) {
                    this.collector.add(new ErrorRecord(subList.get(0), e2.getMessage(), e2.getCode()));
                }
            }
            return newArrayList;
        } catch (DirectiveExecutionException e3) {
            throw new RecipeException(e3.getMessage(), e3);
        }
    }

    @Override // co.cask.wrangler.api.RecipePipeline
    public List<ErrorRecord> errors() {
        return this.collector.get();
    }
}
