package co.cask.cdap.etl.spark.batch;

import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.etl.api.JoinElement;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.connector.SingleConnectorFactory;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.RecordInfo;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.StageStatisticsCollector;
import co.cask.cdap.etl.common.plugin.PipelinePluginContext;
import co.cask.cdap.etl.spark.Compat;
import co.cask.cdap.etl.spark.SparkCollection;
import co.cask.cdap.etl.spark.SparkPairCollection;
import co.cask.cdap.etl.spark.SparkPipelineRunner;
import co.cask.cdap.etl.spark.SparkStageStatisticsCollector;
import co.cask.cdap.etl.spark.function.BatchSourceFunction;
import co.cask.cdap.etl.spark.function.JoinMergeFunction;
import co.cask.cdap.etl.spark.function.JoinOnFunction;
import co.cask.cdap.etl.spark.function.PluginFunctionContext;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.id.EntityId;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-4.3.5.jar:lib/hydrator-spark-core-4.3.5.jar:co/cask/cdap/etl/spark/batch/BatchSparkPipelineDriver.class
 */
/* loaded from: input_file:lib/hydrator-spark-core2_2.11-4.3.5.jar:co/cask/cdap/etl/spark/batch/BatchSparkPipelineDriver.class */
public class BatchSparkPipelineDriver extends SparkPipelineRunner implements JavaSparkMain, TxRunnable {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(DatasetInfo.class, new DatasetInfoTypeAdapter()).registerTypeAdapter(OutputFormatProvider.class, new OutputFormatProviderTypeAdapter()).registerTypeAdapter(InputFormatProvider.class, new InputFormatProviderTypeAdapter()).create();
    private transient JavaSparkContext jsc;
    private transient JavaSparkExecutionContext sec;
    private transient SparkBatchSourceFactory sourceFactory;
    private transient SparkBatchSinkFactory sinkFactory;
    private transient DatasetContext datasetContext;
    private transient Map<String, Integer> stagePartitions;
    private transient int numOfRecordsPreview;

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<RecordInfo<Object>> getSource(StageSpec stageSpec, StageStatisticsCollector stageStatisticsCollector) {
        return new RDDCollection(this.sec, this.jsc, this.datasetContext, this.sinkFactory, this.sourceFactory.createRDD(this.sec, this.jsc, stageSpec.getName(), Object.class, Object.class).flatMap(Compat.convert(new BatchSourceFunction(new PluginFunctionContext(stageSpec, this.sec, stageStatisticsCollector), this.numOfRecordsPreview))));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkPairCollection<Object, Object> addJoinKey(StageSpec stageSpec, String str, SparkCollection<Object> sparkCollection, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        return sparkCollection.flatMapToPair(Compat.convert(new JoinOnFunction(new PluginFunctionContext(stageSpec, this.sec, stageStatisticsCollector), str)));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<Object> mergeJoinResults(StageSpec stageSpec, SparkPairCollection<Object, List<JoinElement<Object>>> sparkPairCollection, StageStatisticsCollector stageStatisticsCollector) throws Exception {
        return sparkPairCollection.flatMap(Compat.convert(new JoinMergeFunction(new PluginFunctionContext(stageSpec, this.sec, stageStatisticsCollector))));
    }

    @Override // co.cask.cdap.api.spark.JavaSparkMain
    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        this.jsc = new JavaSparkContext();
        this.sec = javaSparkExecutionContext;
        Transactionals.execute(javaSparkExecutionContext, this, Exception.class);
    }

    public void run(DatasetContext datasetContext) throws Exception {
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson(this.sec.getSpecification().getProperty(Constants.PIPELINEID), BatchPhaseSpec.class);
        BufferedReader newBufferedReader = Files.newBufferedReader(this.sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath(), StandardCharsets.UTF_8);
        Throwable th = null;
        try {
            try {
                SparkBatchSourceSinkFactoryInfo sparkBatchSourceSinkFactoryInfo = (SparkBatchSourceSinkFactoryInfo) GSON.fromJson(newBufferedReader.readLine(), SparkBatchSourceSinkFactoryInfo.class);
                this.sourceFactory = sparkBatchSourceSinkFactoryInfo.getSparkBatchSourceFactory();
                this.sinkFactory = sparkBatchSourceSinkFactoryInfo.getSparkBatchSinkFactory();
                this.stagePartitions = sparkBatchSourceSinkFactoryInfo.getStagePartitions();
                if (newBufferedReader != null) {
                    if (0 != 0) {
                        try {
                            newBufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newBufferedReader.close();
                    }
                }
                this.datasetContext = datasetContext;
                this.numOfRecordsPreview = batchPhaseSpec.getNumOfRecordsPreview();
                PipelinePluginContext pipelinePluginContext = new PipelinePluginContext(this.sec.getPluginContext(), this.sec.getMetrics(), batchPhaseSpec.isStageLoggingEnabled(), batchPhaseSpec.isProcessTimingEnabled());
                HashMap hashMap = new HashMap();
                if (batchPhaseSpec.pipelineContainsCondition()) {
                    Iterator<StageSpec> it = batchPhaseSpec.getPhase().iterator();
                    while (it.hasNext()) {
                        hashMap.put(it.next().getName(), new SparkStageStatisticsCollector(this.jsc));
                    }
                }
                try {
                    runPipeline(batchPhaseSpec.getPhase(), BatchSource.PLUGIN_TYPE, this.sec, this.stagePartitions, new PipelinePluginInstantiator(pipelinePluginContext, this.sec.getMetrics(), batchPhaseSpec, new SingleConnectorFactory()), hashMap);
                    updateWorkflowToken(this.sec.getWorkflowToken(), hashMap);
                } catch (Throwable th3) {
                    updateWorkflowToken(this.sec.getWorkflowToken(), hashMap);
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (newBufferedReader != null) {
                if (th != null) {
                    try {
                        newBufferedReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newBufferedReader.close();
                }
            }
            throw th4;
        }
    }

    private void updateWorkflowToken(WorkflowToken workflowToken, Map<String, StageStatisticsCollector> map) {
        for (Map.Entry<String, StageStatisticsCollector> entry : map.entrySet()) {
            SparkStageStatisticsCollector sparkStageStatisticsCollector = (SparkStageStatisticsCollector) entry.getValue();
            String str = "stage.statistics." + entry.getKey() + EntityId.IDSTRING_PART_SEPARATOR;
            workflowToken.put(str + Constants.StageStatistics.INPUT_RECORDS, String.valueOf(sparkStageStatisticsCollector.getInputRecordCount()));
            workflowToken.put(str + Constants.StageStatistics.OUTPUT_RECORDS, String.valueOf(sparkStageStatisticsCollector.getOutputRecordCount()));
            workflowToken.put(str + Constants.StageStatistics.ERROR_RECORDS, String.valueOf(sparkStageStatisticsCollector.getErrorRecordCount()));
        }
    }
}
