/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.collection.Seq;

public class AddReducedColumnsToDatasetStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameters) {
        List<String> existingColumns = Arrays.asList(dataset.columns());
        Dataset<Row> startColumns = PreprocessingRunner.helper_datasets.get("startColumns_" + dataLevel);
        ArrayList<String> columnNamesString = new ArrayList<String>();
        ArrayList<Column> columnNames = new ArrayList<Column>();
        List<String> columnsNotBeAddedAgain = Arrays.asList("var_type_", "rev_", "long_", "double_", "text_", "text2_", "timestamp_", "sequence_counter_", "variable_instance_id_");
        if (!SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled()) {
            columnsNotBeAddedAgain = Stream.concat(columnsNotBeAddedAgain.stream(), Stream.of("name_")).collect(Collectors.toList());
        }
        columnNames.add(new Column("proc_inst_id_"));
        columnNames.add(new Column("state_"));
        columnNames.add(new Column("act_inst_id_"));
        for (Row row : startColumns.collectAsList()) {
            String column = row.getString(0);
            if (existingColumns.contains(column) || columnsNotBeAddedAgain.contains(column)) continue;
            columnNamesString.add(column);
            columnNames.add(new Column(column));
        }
        Seq selectionColumns = SparkImporterUtils.getInstance().asSeq(columnNames);
        Dataset initialDataset = PreprocessingRunner.helper_datasets.get("initial_" + dataLevel);
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : columnNamesString) {
            aggregationMap.put(column, "first");
        }
        Column filter = initialDataset.col("state_").isNotNull();
        if (SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled() && dataLevel.equals("process")) {
            filter = initialDataset.col("name_").isNull();
        }
        initialDataset = dataLevel.equals("process") ? initialDataset.select(selectionColumns).filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap).withColumnRenamed("proc_inst_id_", "proc_inst_id__right") : initialDataset.select(selectionColumns).filter(initialDataset.col("activity_id_").isNotNull()).groupBy("proc_inst_id_", new String[]{"act_inst_id_"}).agg(aggregationMap).withColumnRenamed("proc_inst_id_", "proc_inst_id__right").withColumnRenamed("act_inst_id_", "act_inst_id__right");
        String pattern = "(first)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : initialDataset.columns()) {
            Matcher m = r.matcher(columnName);
            if (!m.find()) continue;
            String newColumnName = m.group(2);
            initialDataset = initialDataset.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataLevel.equals("process") ? dataset.join(initialDataset, dataset.col("proc_inst_id_").equalTo((Object)initialDataset.col("proc_inst_id__right")), "left") : dataset.join(initialDataset, dataset.col("proc_inst_id_").equalTo((Object)initialDataset.col("proc_inst_id__right")).and(dataset.col("act_inst_id_").equalTo((Object)initialDataset.col("act_inst_id__right"))), "left");
        if (SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled() && dataLevel.equals("process")) {
            dataset = dataset.drop("name_");
        }
        dataset = dataset.drop("proc_inst_id__right");
        if (dataLevel.equals("activity")) {
            dataset = dataset.drop("act_inst_id__right");
        }
        if (writeStepResultIntoFile) {
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "joined_columns");
        }
        return dataset;
    }
}

