package co.cask.cdap.etl.batch.mapreduce;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchAggregatorContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.CompositeFinisher;
import co.cask.cdap.etl.batch.Finisher;
import co.cask.cdap.etl.batch.LoggedBatchConfigurable;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.conversion.WritableConversion;
import co.cask.cdap.etl.batch.conversion.WritableConversions;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.TypeChecker;
import co.cask.cdap.etl.log.LogStageInjector;
import co.cask.cdap.etl.planner.StageInfo;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-etl-batch-3.4.0.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce.class */
public class ETLMapReduce extends AbstractMapReduce {
    static final String RUNTIME_ARGS_KEY = "cdap.etl.runtime.args";
    static final String SINK_OUTPUTS_KEY = "cdap.etl.sink.outputs";
    static final String GROUP_KEY_CLASS = "cdap.etl.aggregator.group.key.class";
    static final String GROUP_VAL_CLASS = "cdap.etl.aggregator.group.val.class";
    private Finisher finisher;
    private Metrics mrMetrics;
    private final BatchPhaseSpec phaseSpec;
    public static final String NAME = ETLMapReduce.class.getSimpleName();
    static final Type RUNTIME_ARGS_TYPE = new TypeToken<Map<String, Map<String, String>>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.1
    }.getType();
    static final Type SINK_OUTPUTS_TYPE = new TypeToken<Map<String, SinkOutput>>() { // from class: co.cask.cdap.etl.batch.mapreduce.ETLMapReduce.2
    }.getType();
    private static final Logger LOG = LoggerFactory.getLogger(ETLMapReduce.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).create();

    /* loaded from: input_file:lib/cdap-etl-batch-3.4.0.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ETLMapper.class */
    public static class ETLMapper extends Mapper implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private TransformRunner<Object, Object> transformRunner;
        private Metrics mapperMetrics;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            if (Boolean.valueOf((String) mapReduceTaskContext.getSpecification().getProperties().get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
                LogStageInjector.start();
            }
            this.transformRunner = new TransformRunner<>(mapReduceTaskContext, this.mapperMetrics);
        }

        public void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            try {
                this.transformRunner.transform(obj, obj2);
            } catch (Exception e) {
                Throwables.propagate(e);
            }
        }

        public void destroy() {
            this.transformRunner.destroy();
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-3.4.0.jar:co/cask/cdap/etl/batch/mapreduce/ETLMapReduce$ETLReducer.class */
    public static class ETLReducer extends Reducer implements ProgramLifecycle<MapReduceTaskContext<Object, Object>> {
        private Metrics reducerMetrics;
        private TransformRunner<Object, Iterator> transformRunner;

        public void initialize(MapReduceTaskContext<Object, Object> mapReduceTaskContext) throws Exception {
            if (Boolean.valueOf((String) mapReduceTaskContext.getSpecification().getProperties().get(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
                LogStageInjector.start();
            }
            this.transformRunner = new TransformRunner<>(mapReduceTaskContext, this.reducerMetrics);
        }

        protected void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            try {
                this.transformRunner.transform(obj, iterable.iterator());
            } catch (Exception e) {
                Throwables.propagate(e);
            }
        }

        public void destroy() {
            this.transformRunner.destroy();
        }
    }

    public ETLMapReduce(BatchPhaseSpec batchPhaseSpec) {
        this.phaseSpec = batchPhaseSpec;
    }

    public void configure() {
        setName(this.phaseSpec.getPhaseName());
        setDescription("MapReduce phase executor. " + this.phaseSpec.getDescription());
        setMapperResources(this.phaseSpec.getResources());
        setDriverResources(this.phaseSpec.getResources());
        Set<String> sources = this.phaseSpec.getPhase().getSources();
        if (sources.size() != 1) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' must contain exactly one source but it has sources '%s'.", this.phaseSpec.getPhaseName(), Joiner.on(',').join((Iterable<?>) sources)));
        }
        if (this.phaseSpec.getPhase().getSinks().isEmpty()) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' must contain at least one sink but does not have any.", this.phaseSpec.getPhaseName()));
        }
        Set<StageInfo> stagesOfType = this.phaseSpec.getPhase().getStagesOfType(BatchAggregator.PLUGIN_TYPE);
        if (stagesOfType.size() > 1) {
            throw new IllegalArgumentException(String.format("Pipeline phase '%s' cannot contain more than one aggregator but it has aggregators '%s'.", this.phaseSpec.getPhaseName(), Joiner.on(',').join((Iterable<?>) stagesOfType)));
        }
        if (!stagesOfType.isEmpty()) {
            String name = stagesOfType.iterator().next().getName();
            Iterator<StageInfo> it = this.phaseSpec.getPhase().subsetTo(ImmutableSet.of(name)).iterator();
            while (it.hasNext()) {
                StageInfo next = it.next();
                if (next.getErrorDatasetName() != null) {
                    throw new IllegalArgumentException(String.format("Stage %s is not allowed to have an error dataset because it connects to aggregator %s.", next.getName(), name));
                }
            }
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec));
        setProperties(hashMap);
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        if (Boolean.valueOf(mapReduceContext.getSpecification().getProperty(Constants.STAGE_LOGGING_ENABLED)).booleanValue()) {
            LogStageInjector.start();
        }
        CompositeFinisher.Builder builder = CompositeFinisher.builder();
        Job job = (Job) mapReduceContext.getHadoopJob();
        Configuration configuration = job.getConfiguration();
        HashMap hashMap = new HashMap();
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) mapReduceContext.getSpecification().getProperties().get(Constants.PIPELINEID), BatchPhaseSpec.class);
        PipelinePhase phase = batchPhaseSpec.getPhase();
        PipelinePluginInstantiator pipelinePluginInstantiator = new PipelinePluginInstantiator(mapReduceContext, batchPhaseSpec);
        String next = batchPhaseSpec.getPhase().getSources().iterator().next();
        LoggedBatchConfigurable loggedBatchConfigurable = new LoggedBatchConfigurable(next, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(next));
        MapReduceSourceContext mapReduceSourceContext = new MapReduceSourceContext(mapReduceContext, this.mrMetrics, new DatasetContextLookupProvider(mapReduceContext), next, mapReduceContext.getRuntimeArguments());
        loggedBatchConfigurable.prepareRun(mapReduceSourceContext);
        hashMap.put(next, mapReduceSourceContext.getRuntimeArguments());
        builder.add(loggedBatchConfigurable, mapReduceSourceContext);
        HashMap hashMap2 = new HashMap();
        Iterator it = Sets.union(phase.getStagesOfType(Constants.CONNECTOR_TYPE), phase.getStagesOfType(BatchSink.PLUGIN_TYPE)).iterator();
        while (it.hasNext()) {
            StageInfo stageInfo = (StageInfo) it.next();
            String name = stageInfo.getName();
            if (phase.getSinks().contains(name)) {
                LoggedBatchConfigurable loggedBatchConfigurable2 = new LoggedBatchConfigurable(name, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(name));
                MapReduceSinkContext mapReduceSinkContext = new MapReduceSinkContext(mapReduceContext, this.mrMetrics, new DatasetContextLookupProvider(mapReduceContext), name, mapReduceContext.getRuntimeArguments());
                loggedBatchConfigurable2.prepareRun(mapReduceSinkContext);
                hashMap.put(name, mapReduceSinkContext.getRuntimeArguments());
                builder.add(loggedBatchConfigurable2, mapReduceSinkContext);
                hashMap2.put(name, new SinkOutput(mapReduceSinkContext.getOutputNames(), stageInfo.getErrorDatasetName()));
            }
        }
        this.finisher = builder.build();
        configuration.set(SINK_OUTPUTS_KEY, GSON.toJson(hashMap2));
        Iterator it2 = Sets.union(phase.getStagesOfType(Transform.PLUGIN_TYPE), phase.getStagesOfType(BatchSink.PLUGIN_TYPE)).iterator();
        while (it2.hasNext()) {
            StageInfo stageInfo2 = (StageInfo) it2.next();
            if (stageInfo2.getErrorDatasetName() != null) {
                HashMap hashMap3 = new HashMap();
                hashMap3.put("output.properties.avro.schema.output.key", Constants.ERROR_SCHEMA.toString());
                TimePartitionedFileSetArguments.setOutputPartitionTime(hashMap3, mapReduceContext.getLogicalStartTime());
                mapReduceContext.addOutput(Output.ofDataset(stageInfo2.getErrorDatasetName(), hashMap3));
            }
        }
        job.setMapperClass(ETLMapper.class);
        Set<StageInfo> stagesOfType = batchPhaseSpec.getPhase().getStagesOfType(BatchAggregator.PLUGIN_TYPE);
        if (stagesOfType.isEmpty()) {
            job.setNumReduceTasks(0);
        } else {
            job.setReducerClass(ETLReducer.class);
            String name2 = stagesOfType.iterator().next().getName();
            BatchAggregator batchAggregator = (BatchAggregator) pipelinePluginInstantiator.newPluginInstance(name2);
            MapReduceAggregatorContext mapReduceAggregatorContext = new MapReduceAggregatorContext(mapReduceContext, this.mrMetrics, new DatasetContextLookupProvider(mapReduceContext), name2, mapReduceContext.getRuntimeArguments());
            batchAggregator.prepareRun((BatchAggregatorContext) mapReduceAggregatorContext);
            builder.add(batchAggregator, mapReduceAggregatorContext);
            if (mapReduceAggregatorContext.getNumPartitions() != null) {
                job.setNumReduceTasks(mapReduceAggregatorContext.getNumPartitions().intValue());
            }
            Class<?> groupKeyClass = mapReduceAggregatorContext.getGroupKeyClass();
            Class<?> groupValueClass = mapReduceAggregatorContext.getGroupValueClass();
            if (groupKeyClass == null) {
                groupKeyClass = TypeChecker.getGroupKeyClass(batchAggregator);
            }
            if (groupValueClass == null) {
                groupValueClass = TypeChecker.getGroupValueClass(batchAggregator);
            }
            configuration.set(GROUP_KEY_CLASS, groupKeyClass.getName());
            configuration.set(GROUP_VAL_CLASS, groupValueClass.getName());
            WritableConversion conversion = WritableConversions.getConversion(groupKeyClass.getName());
            if (conversion != null) {
                groupKeyClass = conversion.getWritableClass();
            }
            WritableConversion conversion2 = WritableConversions.getConversion(groupValueClass.getName());
            if (conversion2 != null) {
                groupValueClass = conversion2.getWritableClass();
            }
            if (!WritableComparable.class.isAssignableFrom(groupKeyClass)) {
                throw new IllegalArgumentException(String.format("Invalid aggregator %s. The group key class %s must implement Hadoop's WritableComparable.", name2, groupKeyClass));
            }
            if (!Writable.class.isAssignableFrom(groupValueClass)) {
                throw new IllegalArgumentException(String.format("Invalid aggregator %s. The group value class %s must implement Hadoop's Writable.", name2, groupValueClass));
            }
            job.setMapOutputKeyClass(groupKeyClass);
            job.setMapOutputValueClass(groupValueClass);
        }
        configuration.set(RUNTIME_ARGS_KEY, GSON.toJson(hashMap));
    }

    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        this.finisher.onFinish(z);
        LOG.info("Batch Run finished : succeeded = {}", Boolean.valueOf(z));
    }
}
