/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

public class AggregateVariableUpdatesStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameters) {
        List<String> dateFormatColumns = Arrays.asList("start_time_", "end_time_");
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : dataset.columns()) {
            if (column.endsWith("_rev")) {
                aggregationMap.put(column, "max");
                continue;
            }
            if (dateFormatColumns.contains(column)) {
                aggregationMap.put(column, "first");
                continue;
            }
            aggregationMap.put(column, "AllButEmptyString");
        }
        Dataset datasetVUAgg = null;
        datasetVUAgg = dataLevel.equals("process") ? (Arrays.asList(dataset.columns()).contains("timestamp_") ? dataset.filter(functions.isnull((Column)dataset.col("state_"))).orderBy(new Column[]{functions.desc((String)"timestamp_")}).groupBy("proc_inst_id_", new String[]{"name_"}).agg(aggregationMap) : dataset.filter(functions.isnull((Column)dataset.col("state_"))).groupBy("proc_inst_id_", new String[]{"name_"}).agg(aggregationMap)) : dataset.filter(functions.isnull((Column)dataset.col("state_"))).groupBy("act_inst_id_", new String[]{"name_"}).agg(aggregationMap);
        datasetVUAgg = datasetVUAgg.drop("proc_inst_id_");
        datasetVUAgg = datasetVUAgg.drop("name_");
        if (dataLevel.equals("activity")) {
            datasetVUAgg = datasetVUAgg.drop("act_inst_id_");
        }
        String pattern = "(first|max|allbutemptystring)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : datasetVUAgg.columns()) {
            Matcher m = r.matcher(columnName);
            if (!m.find()) continue;
            String newColumnName = m.group(2);
            datasetVUAgg = datasetVUAgg.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataLevel.equals("process") ? dataset.select("proc_inst_id_", new String[]{"state_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_"}).filter(functions.not((Column)functions.isnull((Column)dataset.col("state_")))).union(datasetVUAgg.select("proc_inst_id_", new String[]{"state_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_"})) : dataset.select("proc_inst_id_", new String[]{"state_", "act_inst_id_", "start_time_", "end_time_", "duration_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_"}).filter(functions.not((Column)functions.isnull((Column)dataset.col("state_")))).union(datasetVUAgg.select("proc_inst_id_", new String[]{"state_", "act_inst_id_", "start_time_", "end_time_", "duration_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_"})).orderBy("act_inst_id_", new String[]{"start_time_"});
        if (writeStepResultIntoFile) {
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "agg_variable_updates");
        }
        return dataset;
    }
}

