/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

@PreprocessingStepDescription(name="Aggregate process instances", description="In this step the data is aggregated in a way so that there is only one line per process instance in the dataset. In this step the process state for each process instance is also aggregated to the last state the process instance had in the underlying dataset.")
public class AggregateProcessInstancesStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        String newColumnName;
        Matcher m;
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : dataset.columns()) {
            if (column.equals("proc_inst_id_")) continue;
            if (column.equals("duration_") || column.endsWith("_rev")) {
                aggregationMap.put(column, "max");
                continue;
            }
            if (column.equals("state_")) {
                aggregationMap.put(column, "ProcessState");
                continue;
            }
            aggregationMap.put(column, "AllButEmptyString");
        }
        Column filter = functions.not((Column)functions.isnull((Column)dataset.col("state_")));
        if (config.isDevProcessStateColumnWorkaroundEnabled() && config.getDataLevel().equals("process")) {
            filter = functions.isnull((Column)dataset.col("name_"));
        }
        Dataset datasetPIAgg = dataset.filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap);
        String pattern = "(max|allbutemptystring|processstate)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : dataset.columns()) {
            m = r.matcher(columnName);
            if (!m.find()) continue;
            newColumnName = m.group(2);
            dataset = dataset.withColumnRenamed(columnName, newColumnName);
        }
        filter = functions.isnull((Column)dataset.col("state_"));
        if (config.isDevProcessStateColumnWorkaroundEnabled() && config.getDataLevel().equals("process")) {
            filter = functions.not((Column)functions.isnull((Column)dataset.col("name_")));
        }
        dataset = dataset.filter(filter).groupBy("proc_inst_id_", new String[0]).agg(aggregationMap).union(datasetPIAgg);
        for (String columnName : dataset.columns()) {
            m = r.matcher(columnName);
            if (!m.find()) continue;
            newColumnName = m.group(2);
            dataset = dataset.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataset.drop("name_");
        dataset = dataset.drop("act_inst_id_");
        dataset = dataset.drop("source_");
        dataset.cache();
        BpmnaiLogger.getInstance().writeInfo("Found " + dataset.count() + " process instances.");
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "agg_of_process_instances", config);
        }
        return dataset;
    }
}

