package co.cask.wrangler.executor;

import co.cask.cdap.api.annotation.Beta;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.wrangler.api.Directives;
import co.cask.wrangler.api.ErrorRecord;
import co.cask.wrangler.api.Pipeline;
import co.cask.wrangler.api.PipelineContext;
import co.cask.wrangler.api.PipelineException;
import co.cask.wrangler.api.Record;
import co.cask.wrangler.api.Step;
import co.cask.wrangler.utils.RecordConvertor;
import co.cask.wrangler.utils.RecordConvertorException;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Beta
/* loaded from: input_file:co/cask/wrangler/executor/ParallelPipelineExecutor.class */
public final class ParallelPipelineExecutor implements Pipeline<Record, StructuredRecord, ErrorRecord> {
    private Directives directives;
    private PipelineContext context;
    private RecordConvertor convertor = new RecordConvertor();
    private int threads = Runtime.getRuntime().availableProcessors();
    private ExecutorService executor = Executors.newFixedThreadPool(this.threads, new ThreadFactoryBuilder().setDaemon(true).build());

    @Override // co.cask.wrangler.api.Pipeline
    public void configure(Directives directives, PipelineContext pipelineContext) {
        this.directives = directives;
        this.context = pipelineContext;
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<StructuredRecord> execute(List<Record> list, Schema schema) throws PipelineException {
        try {
            return this.convertor.toStructureRecord(execute(list), schema);
        } catch (RecordConvertorException e) {
            throw new PipelineException("Problem converting into output record. Reason : " + e.getMessage());
        }
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<Record> execute(List<Record> list) throws PipelineException {
        try {
            int size = list.size() / this.threads;
            if (size == 0) {
                size = list.size();
            }
            int i = 0;
            TreeMap treeMap = new TreeMap();
            final List<Step> steps = this.directives.getSteps();
            while (i < list.size()) {
                int i2 = i + size;
                if (i2 > list.size()) {
                    i2 = list.size();
                }
                final List<Record> subList = list.subList(i, i2);
                treeMap.put(Integer.valueOf(i), this.executor.submit(new Callable<List<Record>>() { // from class: co.cask.wrangler.executor.ParallelPipelineExecutor.1
                    private List<Record> records;

                    {
                        this.records = subList;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public List<Record> call() throws Exception {
                        for (Step step : steps) {
                            if (this.records.size() < 1) {
                                break;
                            }
                            this.records = step.execute(this.records, ParallelPipelineExecutor.this.context);
                        }
                        return this.records;
                    }
                }));
                i = i2;
            }
            ArrayList arrayList = new ArrayList(this.threads);
            Iterator it = treeMap.values().iterator();
            while (it.hasNext()) {
                arrayList.add(((Future) it.next()).get());
            }
            return Lists.newArrayList(Iterables.concat(arrayList));
        } catch (Exception e) {
            throw new PipelineException(e);
        }
    }

    @Override // co.cask.wrangler.api.Pipeline
    public List<ErrorRecord> errors() throws PipelineException {
        return new ArrayList();
    }

    private List<StructuredRecord> toStructuredRecord(List<Record> list, Schema schema) {
        ArrayList arrayList = new ArrayList();
        for (Record record : list) {
            StructuredRecord.Builder builder = StructuredRecord.builder(schema);
            Iterator it = schema.getFields().iterator();
            while (it.hasNext()) {
                String name = ((Schema.Field) it.next()).getName();
                Object value = record.getValue(name);
                if (value != null) {
                    builder.set(name, value);
                }
            }
            arrayList.add(builder.build());
        }
        return arrayList;
    }
}
