/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.PreprocessingRunner;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.collection.Seq;

@PreprocessingStepDescription(name="Add reduced columns", description="In the beginning the non relevant columns where removed to speed up the processing. These columns are now added back to the dataset by using the processInstanceId as a reference.")
public class AddReducedColumnsToDatasetStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        List<String> existingColumns = Arrays.asList(dataset.columns());
        Dataset<Row> startColumns = PreprocessingRunner.helper_datasets.get("startColumns_" + config.getDataLevel());
        ArrayList<String> columnNamesString = new ArrayList<String>();
        ArrayList<Column> columnNames = new ArrayList<Column>();
        List<String> columnsNotBeAddedAgain = Arrays.asList("var_type_", "rev_", "long_", "double_", "text_", "text2_", "timestamp_", "sequence_counter_", "variable_instance_id_", "source_");
        if (!config.isDevProcessStateColumnWorkaroundEnabled()) {
            columnsNotBeAddedAgain = Stream.concat(columnsNotBeAddedAgain.stream(), Stream.of("name_")).collect(Collectors.toList());
        }
        columnNames.add(new Column("proc_inst_id_"));
        columnNames.add(new Column("state_"));
        columnNames.add(new Column("act_inst_id_"));
        for (Row row : startColumns.collectAsList()) {
            String column = row.getString(0);
            if (existingColumns.contains(column) || columnsNotBeAddedAgain.contains(column)) continue;
            columnNamesString.add(column);
            columnNames.add(new Column(column));
        }
        Seq selectionColumns = BpmnaiUtils.getInstance().asSeq(columnNames);
        Dataset initialDataset = PreprocessingRunner.helper_datasets.get("initial_" + config.getDataLevel());
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : columnNamesString) {
            aggregationMap.put(column, "first");
        }
        Column filter = initialDataset.col("state_").isNotNull();
        if (config.isDevProcessStateColumnWorkaroundEnabled() && config.getDataLevel().equals("process")) {
            filter = initialDataset.col("name_").isNull();
        }
        initialDataset = config.getDataLevel().equals("process") ? initialDataset.select(selectionColumns).filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap).withColumnRenamed("proc_inst_id_", "proc_inst_id__right") : initialDataset.select(selectionColumns).filter(initialDataset.col("activity_id_").isNotNull()).groupBy("proc_inst_id_", new String[]{"act_inst_id_"}).agg(aggregationMap).withColumnRenamed("proc_inst_id_", "proc_inst_id__right").withColumnRenamed("act_inst_id_", "act_inst_id__right");
        String pattern = "(first)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : initialDataset.columns()) {
            Matcher m = r.matcher(columnName);
            if (!m.find()) continue;
            String newColumnName = m.group(2);
            initialDataset = initialDataset.withColumnRenamed(columnName, newColumnName);
        }
        dataset = config.getDataLevel().equals("process") ? dataset.join(initialDataset, dataset.col("proc_inst_id_").equalTo((Object)initialDataset.col("proc_inst_id__right")), "left") : dataset.join(initialDataset, dataset.col("proc_inst_id_").equalTo((Object)initialDataset.col("proc_inst_id__right")).and(dataset.col("act_inst_id_").equalTo((Object)initialDataset.col("act_inst_id__right"))), "left");
        if (config.isDevProcessStateColumnWorkaroundEnabled() && config.getDataLevel().equals("process")) {
            dataset = dataset.drop("name_");
        }
        dataset = dataset.drop("proc_inst_id__right");
        if (config.getDataLevel().equals("activity")) {
            dataset = dataset.drop("act_inst_id__right");
        }
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "joined_columns", config);
        }
        return dataset;
    }
}

