package co.cask.cdap.etl.spark.streaming.function;

import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.spark.SparkPipelineRuntime;
import co.cask.cdap.etl.spark.batch.BasicSparkExecutionPluginContext;
import co.cask.cdap.etl.spark.function.PluginFunctionContext;
import co.cask.cdap.etl.spark.streaming.DynamicDriverContext;
import co.cask.cdap.etl.spec.StageSpec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-5.1.1.jar:lib/hydrator-spark-core-5.1.1.jar:co/cask/cdap/etl/spark/streaming/function/DynamicSparkCompute.class
 */
/* loaded from: input_file:lib/hydrator-spark-core-5.1.1.jar:co/cask/cdap/etl/spark/streaming/function/DynamicSparkCompute.class */
public class DynamicSparkCompute<T, U> extends SparkCompute<T, U> {
    private final DynamicDriverContext dynamicDriverContext;
    private transient SparkCompute<T, U> delegate;

    public DynamicSparkCompute(DynamicDriverContext dynamicDriverContext, SparkCompute<T, U> sparkCompute) {
        this.dynamicDriverContext = dynamicDriverContext;
        this.delegate = sparkCompute;
    }

    @Override // co.cask.cdap.etl.api.batch.SparkCompute
    public void initialize(SparkExecutionPluginContext sparkExecutionPluginContext) throws Exception {
        this.delegate.initialize(sparkExecutionPluginContext);
    }

    @Override // co.cask.cdap.etl.api.batch.SparkCompute
    public JavaRDD<U> transform(SparkExecutionPluginContext sparkExecutionPluginContext, JavaRDD<T> javaRDD) throws Exception {
        lazyInit(JavaSparkContext.fromSparkContext(javaRDD.context()));
        return this.delegate.transform(sparkExecutionPluginContext, javaRDD);
    }

    private void lazyInit(final JavaSparkContext javaSparkContext) throws Exception {
        if (this.delegate == null) {
            PluginFunctionContext pluginFunctionContext = this.dynamicDriverContext.getPluginFunctionContext();
            this.delegate = (SparkCompute) pluginFunctionContext.createPlugin();
            final StageSpec stageSpec = pluginFunctionContext.getStageSpec();
            final JavaSparkExecutionContext sparkExecutionContext = this.dynamicDriverContext.getSparkExecutionContext();
            Transactionals.execute(sparkExecutionContext, new TxRunnable() { // from class: co.cask.cdap.etl.spark.streaming.function.DynamicSparkCompute.1
                public void run(DatasetContext datasetContext) throws Exception {
                    DynamicSparkCompute.this.delegate.initialize(new BasicSparkExecutionPluginContext(sparkExecutionContext, javaSparkContext, datasetContext, new SparkPipelineRuntime(sparkExecutionContext), stageSpec));
                }
            }, Exception.class);
        }
    }
}
