/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

@PreprocessingStepDescription(name="Add variable columns", description="In this step all process variables detected in prior steps are added as separate columns to the dataset.")
public class AddVariableColumnsStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        dataset = this.doVariableUpdatesAggregation(dataset, config.isWriteStepResultsIntoFile(), config.getDataLevel(), config);
        dataset = this.doAddVariableColumns(dataset, config.isWriteStepResultsIntoFile(), config.getDataLevel(), config);
        return dataset;
    }

    private Dataset<Row> doVariableUpdatesAggregation(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, SparkRunnerConfig config) {
        List<String> dateFormatColumns = Arrays.asList("start_time_", "end_time_");
        HashMap<String, String> aggregationMap = new HashMap<String, String>();
        for (String column : dataset.columns()) {
            if (column.endsWith("_rev")) {
                aggregationMap.put(column, "max");
                continue;
            }
            if (dateFormatColumns.contains(column)) {
                aggregationMap.put(column, "first");
                continue;
            }
            aggregationMap.put(column, "AllButEmptyString");
        }
        Dataset datasetVUAgg = dataset;
        datasetVUAgg = datasetVUAgg.filter(dataset.col("source_").equalTo((Object)"variableUpdate"));
        if (dataLevel.equals("process")) {
            if (Arrays.asList(dataset.columns()).contains("timestamp_")) {
                datasetVUAgg = datasetVUAgg.orderBy(new Column[]{functions.desc((String)"timestamp_")});
            }
            datasetVUAgg = datasetVUAgg.groupBy("proc_inst_id_", new String[]{"name_"}).agg(aggregationMap);
        } else {
            datasetVUAgg = datasetVUAgg.groupBy("act_inst_id_", new String[]{"name_"}).agg(aggregationMap);
        }
        datasetVUAgg = datasetVUAgg.drop("proc_inst_id_");
        datasetVUAgg = datasetVUAgg.drop("name_");
        if (dataLevel.equals("activity")) {
            datasetVUAgg = datasetVUAgg.drop("act_inst_id_");
        }
        String pattern = "(first|max|allbutemptystring)\\((.+)\\)";
        Pattern r = Pattern.compile(pattern);
        for (String columnName : datasetVUAgg.columns()) {
            Matcher m = r.matcher(columnName);
            if (!m.find()) continue;
            String newColumnName = m.group(2);
            datasetVUAgg = datasetVUAgg.withColumnRenamed(columnName, newColumnName);
        }
        dataset = dataLevel.equals("process") ? dataset.select("proc_inst_id_", new String[]{"state_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_", "source_"}).filter(dataset.col("source_").equalTo((Object)"processInstance")).union(datasetVUAgg.select("proc_inst_id_", new String[]{"state_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_", "source_"})) : dataset.select("proc_inst_id_", new String[]{"state_", "act_inst_id_", "start_time_", "end_time_", "duration_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_", "source_"}).filter(functions.not((Column)functions.isnull((Column)dataset.col("start_time_")))).union(datasetVUAgg.select("proc_inst_id_", new String[]{"state_", "act_inst_id_", "start_time_", "end_time_", "duration_", "name_", "var_type_", "rev_", "long_", "double_", "text_", "text2_", "source_"})).orderBy("act_inst_id_", new String[]{"start_time_"});
        if (writeStepResultIntoFile) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "agg_variable_updates", config);
        }
        return dataset;
    }

    private Dataset<Row> doAddVariableColumns(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, SparkRunnerConfig config) {
        Map varMap = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED);
        Set variables = varMap.keySet();
        for (String v : variables) {
            dataset = dataset.withColumn(v, functions.when((Column)dataset.col("name_").equalTo((Object)v), (Object)functions.when((Column)dataset.col("var_type_").equalTo((Object)"string"), (Object)dataset.col("text_")).when(dataset.col("var_type_").equalTo((Object)"null"), (Object)dataset.col("text_")).when(dataset.col("var_type_").equalTo((Object)"boolean"), (Object)dataset.col("long_")).when(dataset.col("var_type_").equalTo((Object)"integer"), (Object)dataset.col("long_")).when(dataset.col("var_type_").equalTo((Object)"long"), (Object)dataset.col("long_")).when(dataset.col("var_type_").equalTo((Object)"double"), (Object)dataset.col("double_")).when(dataset.col("var_type_").equalTo((Object)"date"), (Object)dataset.col("long_")).otherwise((Object)dataset.col("text2_"))).otherwise(null));
            if (!dataLevel.equals("process") || !config.isRevCountEnabled()) continue;
            dataset = dataset.withColumn(v + "_rev", functions.when((Column)dataset.col("name_").equalTo((Object)v), (Object)dataset.col("rev_")).otherwise((Object)"0"));
        }
        dataset = dataset.drop(new String[]{"var_type_", "rev_", "double_", "long_", "text_", "text2_"});
        if (!config.isDevProcessStateColumnWorkaroundEnabled()) {
            dataset = dataset.drop("name_");
        }
        if (writeStepResultIntoFile) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "add_var_columns", config);
        }
        return dataset;
    }
}

