/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

public class AggregateProcessInstancesStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameters) {
        String newColumnName;
        Matcher m;
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : dataset.columns()) {
            if (column.equals("proc_inst_id_")) continue;
            if (column.equals("duration_")) {
                aggregationMap.put(column, "max");
                continue;
            }
            if (column.equals("state_")) {
                aggregationMap.put(column, "ProcessState");
                continue;
            }
            aggregationMap.put(column, "AllButEmptyString");
        }
        Column filter = functions.not((Column)functions.isnull((Column)dataset.col("state_")));
        if (SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled() && dataLevel.equals("process")) {
            filter = functions.isnull((Column)dataset.col("name_"));
        }
        Dataset datasetPIAgg = dataset.filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap);
        String pattern = "(max|allbutemptystring|processstate)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : dataset.columns()) {
            m = r.matcher(columnName);
            if (!m.find()) continue;
            newColumnName = m.group(2);
            dataset = dataset.withColumnRenamed(columnName, newColumnName);
        }
        filter = functions.isnull((Column)dataset.col("state_"));
        if (SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled() && dataLevel.equals("process")) {
            filter = functions.not((Column)functions.isnull((Column)dataset.col("name_")));
        }
        dataset = dataset.filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap).union(datasetPIAgg);
        for (String columnName : dataset.columns()) {
            m = r.matcher(columnName);
            if (!m.find()) continue;
            newColumnName = m.group(2);
            dataset = dataset.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataset.drop("name_");
        dataset = dataset.drop("act_inst_id_");
        SparkImporterLogger.getInstance().writeInfo("Found " + dataset.count() + " process instances.");
        if (writeStepResultIntoFile) {
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "agg_of_process_instances");
        }
        return dataset;
    }
}

