package co.cask.cdap.template.etl.batch;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.template.etl.api.Transform;
import co.cask.cdap.template.etl.api.TransformContext;
import co.cask.cdap.template.etl.api.Transformation;
import co.cask.cdap.template.etl.api.batch.BatchSink;
import co.cask.cdap.template.etl.api.batch.BatchSinkContext;
import co.cask.cdap.template.etl.api.batch.BatchSource;
import co.cask.cdap.template.etl.api.batch.BatchSourceContext;
import co.cask.cdap.template.etl.batch.config.ETLBatchConfig;
import co.cask.cdap.template.etl.common.Constants;
import co.cask.cdap.template.etl.common.Destroyables;
import co.cask.cdap.template.etl.common.ETLStage;
import co.cask.cdap.template.etl.common.StageMetrics;
import co.cask.cdap.template.etl.common.TransformExecutor;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/template/etl/batch/ETLMapReduce.class */
public class ETLMapReduce extends AbstractMapReduce {
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Gson GSON = new Gson();
    private BatchSource batchSource;
    private BatchSink batchSink;
    private String sourcePluginId;
    private String sinkPluginId;
    private Metrics mrMetrics;

    /* loaded from: input_file:co/cask/cdap/template/etl/batch/ETLMapReduce$ETLMapper.class */
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceContext> {
        private static final Gson GSON = new Gson();
        private static final Type STRING_LIST_TYPE = new TypeToken<List<String>>() { // from class: co.cask.cdap.template.etl.batch.ETLMapReduce.ETLMapper.1
        }.getType();
        private List<Transform> transforms;
        private TransformExecutor<KeyValue, KeyValue> transformExecutor;
        private Metrics mapperMetrics;

        public void initialize(MapReduceContext mapReduceContext) throws Exception {
            Map runtimeArguments = mapReduceContext.getRuntimeArguments();
            ETLBatchConfig eTLBatchConfig = (ETLBatchConfig) GSON.fromJson((String) runtimeArguments.get(Constants.CONFIG_KEY), ETLBatchConfig.class);
            String str = (String) runtimeArguments.get(Constants.Source.PLUGINID);
            String str2 = (String) runtimeArguments.get(Constants.Sink.PLUGINID);
            List<String> list = (List) GSON.fromJson((String) runtimeArguments.get(Constants.Transform.PLUGINIDS), STRING_LIST_TYPE);
            List<ETLStage> transforms = eTLBatchConfig.getTransforms();
            ETLMapReduce.LOG.info("Transform Stages : {}", transforms);
            List<Transformation> newArrayListWithCapacity = Lists.newArrayListWithCapacity(transforms.size() + 2);
            List<StageMetrics> newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(transforms.size() + 2);
            this.transforms = Lists.newArrayListWithCapacity(transforms.size());
            BatchSource batchSource = (BatchSource) mapReduceContext.newPluginInstance(str);
            batchSource.initialize((BatchSourceContext) new MapReduceSourceContext(mapReduceContext, this.mapperMetrics, str));
            newArrayListWithCapacity.add(batchSource);
            newArrayListWithCapacity2.add(new StageMetrics(this.mapperMetrics, StageMetrics.Type.SOURCE, eTLBatchConfig.getSource().getName()));
            addTransforms(transforms, newArrayListWithCapacity, newArrayListWithCapacity2, list, mapReduceContext);
            BatchSink batchSink = (BatchSink) mapReduceContext.newPluginInstance(str2);
            batchSink.initialize((BatchSinkContext) new MapReduceSinkContext(mapReduceContext, this.mapperMetrics, str2));
            newArrayListWithCapacity.add(batchSink);
            newArrayListWithCapacity2.add(new StageMetrics(this.mapperMetrics, StageMetrics.Type.SINK, eTLBatchConfig.getSink().getName()));
            this.transformExecutor = new TransformExecutor<>(newArrayListWithCapacity, newArrayListWithCapacity2);
        }

        private void addTransforms(List<ETLStage> list, List<Transformation> list2, List<StageMetrics> list3, List<String> list4, MapReduceContext mapReduceContext) throws Exception {
            Preconditions.checkArgument(list.size() == list4.size());
            for (int i = 0; i < list.size(); i++) {
                ETLStage eTLStage = list.get(i);
                String str = list4.get(i);
                Transform transform = (Transform) mapReduceContext.newPluginInstance(str);
                transform.initialize((TransformContext) new BatchTransformContext(mapReduceContext, this.mapperMetrics, str));
                list2.add(transform);
                this.transforms.add(transform);
                list3.add(new StageMetrics(this.mapperMetrics, StageMetrics.Type.TRANSFORM, eTLStage.getName()));
            }
        }

        public void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            try {
                for (KeyValue keyValue : this.transformExecutor.runOneIteration(new KeyValue(obj, obj2))) {
                    context.write(keyValue.getKey(), keyValue.getValue());
                }
            } catch (Exception e) {
                ETLMapReduce.LOG.error("Exception thrown in BatchDriver Mapper : {}", e);
                Throwables.propagate(e);
            }
        }

        public void destroy() {
            Destroyables.destroyQuietly(this.transformExecutor);
        }
    }

    public void configure() {
        setName(ETLMapReduce.class.getSimpleName());
        setDescription("MapReduce driver for Batch ETL Adapters");
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        Job job = (Job) mapReduceContext.getHadoopJob();
        Map runtimeArguments = mapReduceContext.getRuntimeArguments();
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.ADAPTER_NAME));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.CONFIG_KEY));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Source.PLUGINID));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Sink.PLUGINID));
        Preconditions.checkArgument(runtimeArguments.containsKey(Constants.Transform.PLUGINIDS));
        ETLBatchConfig eTLBatchConfig = (ETLBatchConfig) GSON.fromJson((String) runtimeArguments.get(Constants.CONFIG_KEY), ETLBatchConfig.class);
        prepareSource(mapReduceContext, eTLBatchConfig.getSource());
        prepareSink(mapReduceContext, eTLBatchConfig.getSink());
        if (eTLBatchConfig.getResources() != null) {
            mapReduceContext.setMapperResources(eTLBatchConfig.getResources());
        }
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);
    }

    private void prepareSource(MapReduceContext mapReduceContext, ETLStage eTLStage) throws Exception {
        this.sourcePluginId = (String) mapReduceContext.getRuntimeArguments().get(Constants.Source.PLUGINID);
        this.batchSource = (BatchSource) mapReduceContext.newPluginInstance(this.sourcePluginId);
        MapReduceSourceContext mapReduceSourceContext = new MapReduceSourceContext(mapReduceContext, this.mrMetrics, this.sourcePluginId);
        LOG.info("Source Stage : {}", eTLStage);
        LOG.info("Source Class : {}", this.batchSource.getClass().getName());
        this.batchSource.prepareRun(mapReduceSourceContext);
    }

    private void prepareSink(MapReduceContext mapReduceContext, ETLStage eTLStage) throws Exception {
        this.sinkPluginId = (String) mapReduceContext.getRuntimeArguments().get(Constants.Sink.PLUGINID);
        this.batchSink = (BatchSink) mapReduceContext.newPluginInstance(this.sinkPluginId);
        MapReduceSinkContext mapReduceSinkContext = new MapReduceSinkContext(mapReduceContext, this.mrMetrics, this.sinkPluginId);
        LOG.info("Sink Stage : {}", eTLStage);
        LOG.info("Sink Class : {}", this.batchSink.getClass().getName());
        this.batchSink.prepareRun(mapReduceSinkContext);
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        onRunFinishSource(mapReduceContext, z);
        onRunFinishSink(mapReduceContext, z);
        LOG.info("Batch Run for Adapter {} : {}", mapReduceContext.getRuntimeArguments().get(Constants.ADAPTER_NAME), Boolean.valueOf(z));
    }

    private void onRunFinishSource(MapReduceContext mapReduceContext, boolean z) {
        MapReduceSourceContext mapReduceSourceContext = new MapReduceSourceContext(mapReduceContext, this.mrMetrics, this.sourcePluginId);
        LOG.info("On RunFinish Source : {}", this.batchSource.getClass().getName());
        try {
            this.batchSource.onRunFinish(z, mapReduceSourceContext);
        } catch (Throwable th) {
            LOG.warn("Exception when calling onRunFinish on {}", this.batchSource, th);
        }
    }

    private void onRunFinishSink(MapReduceContext mapReduceContext, boolean z) {
        MapReduceSinkContext mapReduceSinkContext = new MapReduceSinkContext(mapReduceContext, this.mrMetrics, this.sinkPluginId);
        LOG.info("On RunFinish Sink : {}", this.batchSink.getClass().getName());
        try {
            this.batchSink.onRunFinish(z, mapReduceSinkContext);
        } catch (Throwable th) {
            LOG.warn("Exception when calling onRunFinish on {}", this.batchSink, th);
        }
    }
}
