/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

@PreprocessingStepDescription(name="Aggregate activity instances", description="In this step the data is aggregated in a way so that there is only one line per activity instance per process instance in the dataset.")
public class AggregateActivityInstancesStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : dataset.columns()) {
            if (column.equals("proc_inst_id_")) continue;
            if (column.equals("duration_") || column.endsWith("_rev")) {
                aggregationMap.put(column, "max");
                continue;
            }
            if (column.equals("state_")) {
                aggregationMap.put(column, "ProcessState");
                continue;
            }
            if (column.equals("act_inst_id_")) continue;
            aggregationMap.put(column, "AllButEmptyString");
        }
        dataset = dataset.filter(dataset.col("source_").notEqual((Object)"processInstance")).groupBy("proc_inst_id_", new String[]{"act_inst_id_"}).agg(aggregationMap);
        String pattern = "(max|allbutemptystring|processstate)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : dataset.columns()) {
            Matcher m = r.matcher(columnName);
            if (!m.find()) continue;
            String newColumnName = m.group(2);
            dataset = dataset.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataset.drop("name_");
        dataset = dataset.drop("source_");
        dataset = dataset.sort("start_time_", new String[0]);
        dataset.cache();
        BpmnaiLogger.getInstance().writeInfo("Found " + dataset.count() + " activity instances.");
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "agg_of_activity_instances", config);
        }
        return dataset;
    }
}

