package co.cask.cdap.etl.batch.spark;

import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.plugin.PluginContext;
import co.cask.cdap.etl.api.Aggregator;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.TransformExecutorFactory;
import co.cask.cdap.etl.common.DefaultEmitter;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.TrackedTransform;
import java.util.Iterator;
import java.util.Map;
import scala.Tuple2;

/* loaded from: input_file:lib/cdap-etl-batch-3.4.2.jar:co/cask/cdap/etl/batch/spark/SparkTransformExecutorFactory.class */
public class SparkTransformExecutorFactory<T> extends TransformExecutorFactory<T> {
    private static final Transformation IDENTITY_TRANSFORMATION = new Transformation() { // from class: co.cask.cdap.etl.batch.spark.SparkTransformExecutorFactory.1
        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(Object obj, Emitter emitter) throws Exception {
            emitter.emit(obj);
        }
    };
    private final PluginContext pluginContext;
    private final long logicalStartTime;
    private final Map<String, String> runtimeArgs;
    private final boolean isFirstHalf;

    /* loaded from: input_file:lib/cdap-etl-batch-3.4.2.jar:co/cask/cdap/etl/batch/spark/SparkTransformExecutorFactory$PostGroupAggregatorTransformation.class */
    private static class PostGroupAggregatorTransformation<GROUP_KEY, GROUP_VAL, OUT> implements Transformation<KeyValue<GROUP_KEY, Iterable<GROUP_VAL>>, OUT> {
        private final Aggregator<GROUP_KEY, GROUP_VAL, OUT> aggregator;

        public PostGroupAggregatorTransformation(Aggregator<GROUP_KEY, GROUP_VAL, OUT> aggregator) {
            this.aggregator = aggregator;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(KeyValue<GROUP_KEY, Iterable<GROUP_VAL>> keyValue, Emitter<OUT> emitter) throws Exception {
            this.aggregator.aggregate(keyValue.getKey(), ((Iterable) keyValue.getValue()).iterator(), emitter);
        }
    }

    /* loaded from: input_file:lib/cdap-etl-batch-3.4.2.jar:co/cask/cdap/etl/batch/spark/SparkTransformExecutorFactory$PreGroupAggregatorTransformation.class */
    private static class PreGroupAggregatorTransformation<GROUP_KEY, GROUP_VAL> implements Transformation<GROUP_VAL, Tuple2<GROUP_KEY, GROUP_VAL>> {
        private final Aggregator<GROUP_KEY, GROUP_VAL, ?> aggregator;
        private final DefaultEmitter<GROUP_KEY> groupKeyEmitter = new DefaultEmitter<>();

        public PreGroupAggregatorTransformation(Aggregator<GROUP_KEY, GROUP_VAL, ?> aggregator) {
            this.aggregator = aggregator;
        }

        @Override // co.cask.cdap.etl.api.Transformation
        public void transform(GROUP_VAL group_val, Emitter<Tuple2<GROUP_KEY, GROUP_VAL>> emitter) throws Exception {
            this.groupKeyEmitter.reset();
            this.aggregator.groupBy(group_val, this.groupKeyEmitter);
            Iterator<GROUP_KEY> it = this.groupKeyEmitter.getEntries().iterator();
            while (it.hasNext()) {
                emitter.emit(new Tuple2<>(it.next(), group_val));
            }
        }
    }

    public SparkTransformExecutorFactory(PluginContext pluginContext, PipelinePluginInstantiator pipelinePluginInstantiator, Metrics metrics, long j, Map<String, String> map, boolean z) {
        super(pipelinePluginInstantiator, metrics);
        this.pluginContext = pluginContext;
        this.logicalStartTime = j;
        this.runtimeArgs = map;
        this.isFirstHalf = z;
    }

    @Override // co.cask.cdap.etl.batch.TransformExecutorFactory
    protected BatchRuntimeContext createRuntimeContext(String str) {
        return new SparkBatchRuntimeContext(this.pluginContext, this.metrics, this.logicalStartTime, this.runtimeArgs, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.etl.batch.TransformExecutorFactory
    public TrackedTransform getTransformation(String str, String str2) throws Exception {
        DefaultStageMetrics defaultStageMetrics = new DefaultStageMetrics(this.metrics, str2);
        if (!BatchAggregator.PLUGIN_TYPE.equals(str)) {
            return SparkSink.PLUGIN_TYPE.equals(str) ? new TrackedTransform(IDENTITY_TRANSFORMATION, defaultStageMetrics, TrackedTransform.RECORDS_IN, null) : SparkCompute.PLUGIN_TYPE.equals(str) ? this.isFirstHalf ? new TrackedTransform(IDENTITY_TRANSFORMATION, defaultStageMetrics, TrackedTransform.RECORDS_IN, null) : new TrackedTransform(IDENTITY_TRANSFORMATION, defaultStageMetrics, TrackedTransform.RECORDS_OUT, null) : super.getTransformation(str, str2);
        }
        BatchAggregator batchAggregator = (BatchAggregator) this.pluginInstantiator.newPluginInstance(str2);
        batchAggregator.initialize(createRuntimeContext(str2));
        return this.isFirstHalf ? getTrackedGroupStep(new PreGroupAggregatorTransformation(batchAggregator), defaultStageMetrics) : getTrackedAggregateStep(new PostGroupAggregatorTransformation(batchAggregator), defaultStageMetrics);
    }
}
