package co.cask.wrangler.executor;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.wrangler.api.DirectiveParseException;
import co.cask.wrangler.api.Directives;
import co.cask.wrangler.api.ErrorRecord;
import co.cask.wrangler.api.ErrorRecordCollector;
import co.cask.wrangler.api.ErrorRecordException;
import co.cask.wrangler.api.Pipeline;
import co.cask.wrangler.api.PipelineContext;
import co.cask.wrangler.api.PipelineException;
import co.cask.wrangler.api.Record;
import co.cask.wrangler.api.Step;
import co.cask.wrangler.api.StepException;
import co.cask.wrangler.utils.RecordConvertor;
import co.cask.wrangler.utils.RecordConvertorException;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:co/cask/wrangler/executor/PipelineExecutor.class */
public final class PipelineExecutor implements Pipeline<Record, StructuredRecord, ErrorRecord> {
    private Directives directives;
    private PipelineContext context;
    private List<Step> steps;
    private final ErrorRecordCollector collector = new ErrorRecordCollector();
    private RecordConvertor convertor = new RecordConvertor();

    @Override // co.cask.wrangler.api.Pipeline
    public void configure(Directives directives, PipelineContext pipelineContext) throws PipelineException {
        this.directives = directives;
        this.context = pipelineContext;
        try {
            this.steps = directives.getSteps();
        } catch (DirectiveParseException e) {
            throw new PipelineException(String.format(e.getMessage(), new Object[0]));
        }
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<StructuredRecord> execute(List<Record> list, Schema schema) throws PipelineException {
        try {
            return this.convertor.toStructureRecord(execute(list), schema);
        } catch (RecordConvertorException e) {
            throw new PipelineException("Problem converting into output record. Reason : " + e.getMessage());
        }
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<Record> execute(List<Record> list) throws PipelineException {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            this.collector.reset();
            for (int i = 0; i < list.size(); i++) {
                List<Record> subList = list.subList(i, i + 1);
                try {
                    Iterator<Step> it = this.steps.iterator();
                    while (it.hasNext()) {
                        subList = it.next().execute(subList, this.context);
                        if (subList.size() < 1) {
                            break;
                        }
                    }
                    if (subList.size() > 0) {
                        newArrayList.addAll(subList);
                    }
                } catch (ErrorRecordException e) {
                    this.collector.add(new ErrorRecord(subList.get(0), e.getMessage(), e.getCode()));
                }
            }
            return newArrayList;
        } catch (StepException e2) {
            throw new PipelineException(e2);
        }
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<ErrorRecord> errors() throws PipelineException {
        return this.collector.get();
    }

    private List<StructuredRecord> toStructuredRecord(List<Record> list, Schema schema) {
        ArrayList arrayList = new ArrayList();
        for (Record record : list) {
            StructuredRecord.Builder builder = StructuredRecord.builder(schema);
            Iterator it = schema.getFields().iterator();
            while (it.hasNext()) {
                String name = ((Schema.Field) it.next()).getName();
                Object value = record.getValue(name);
                if (value != null) {
                    builder.set(name, value);
                }
            }
            arrayList.add(builder.build());
        }
        return arrayList;
    }
}
