package org.apache.dolphinscheduler.data.quality.execution;

import java.util.Iterator;
import java.util.List;
import org.apache.dolphinscheduler.data.quality.Constants;
import org.apache.dolphinscheduler.data.quality.config.Config;
import org.apache.dolphinscheduler.data.quality.exception.ConfigRuntimeException;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchTransformer;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/* loaded from: input_file:org/apache/dolphinscheduler/data/quality/execution/SparkBatchExecution.class */
public class SparkBatchExecution implements Execution<BatchReader, BatchTransformer, BatchWriter> {
    private final SparkRuntimeEnvironment environment;

    public SparkBatchExecution(SparkRuntimeEnvironment sparkRuntimeEnvironment) throws ConfigRuntimeException {
        this.environment = sparkRuntimeEnvironment;
    }

    @Override // org.apache.dolphinscheduler.data.quality.execution.Execution
    public void execute(List<BatchReader> list, List<BatchTransformer> list2, List<BatchWriter> list3) {
        list.forEach(batchReader -> {
            registerInputTempView(batchReader, this.environment);
        });
        if (!list.isEmpty()) {
            Dataset<Row> read = list.get(0).read(this.environment);
            for (BatchTransformer batchTransformer : list2) {
                read = executeTransformer(this.environment, batchTransformer, read);
                registerTransformTempView(batchTransformer, read);
            }
            Iterator<BatchWriter> it = list3.iterator();
            while (it.hasNext()) {
                executeWriter(this.environment, it.next(), read);
            }
        }
        this.environment.sparkSession().stop();
    }

    private void registerTempView(String str, Dataset<Row> dataset) {
        if (dataset == null) {
            throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView");
        }
        dataset.createOrReplaceTempView(str);
    }

    private void registerInputTempView(BatchReader batchReader, SparkRuntimeEnvironment sparkRuntimeEnvironment) {
        Config config = batchReader.getConfig();
        if (!Boolean.TRUE.equals(config.has(Constants.OUTPUT_TABLE))) {
            throw new ConfigRuntimeException("[" + batchReader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config");
        }
        registerTempView(config.getString(Constants.OUTPUT_TABLE), batchReader.read(sparkRuntimeEnvironment));
    }

    private Dataset<Row> executeTransformer(SparkRuntimeEnvironment sparkRuntimeEnvironment, BatchTransformer batchTransformer, Dataset<Row> dataset) {
        Config config = batchTransformer.getConfig();
        Dataset<Row> dataset2 = null;
        if (Boolean.TRUE.equals(config.has(Constants.INPUT_TABLE))) {
            for (String str : config.getString(Constants.INPUT_TABLE).split(",")) {
                Dataset<Row> table = sparkRuntimeEnvironment.sparkSession().read().table(str);
                dataset2 = dataset2 == null ? table : dataset2.union(table);
            }
        } else {
            dataset2 = dataset;
        }
        if (Boolean.TRUE.equals(config.has(Constants.TMP_TABLE))) {
            if (dataset2 == null) {
                dataset2 = dataset;
            }
            registerTempView(config.getString(Constants.TMP_TABLE), dataset2);
        }
        return batchTransformer.transform(dataset2, sparkRuntimeEnvironment);
    }

    private void registerTransformTempView(BatchTransformer batchTransformer, Dataset<Row> dataset) {
        Config config = batchTransformer.getConfig();
        if (Boolean.TRUE.equals(config.has(Constants.OUTPUT_TABLE))) {
            registerTempView(config.getString(Constants.OUTPUT_TABLE), dataset);
        }
    }

    private void executeWriter(SparkRuntimeEnvironment sparkRuntimeEnvironment, BatchWriter batchWriter, Dataset<Row> dataset) {
        Config config = batchWriter.getConfig();
        Dataset<Row> dataset2 = dataset;
        if (Boolean.TRUE.equals(config.has(Constants.INPUT_TABLE))) {
            dataset2 = sparkRuntimeEnvironment.sparkSession().read().table(config.getString(Constants.INPUT_TABLE));
        }
        batchWriter.write(dataset2, sparkRuntimeEnvironment);
    }
}
