/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.configuration.Configuration;
import de.viadee.bpmnai.core.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.VariableConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.VariableNameMapping;
import de.viadee.bpmnai.core.configuration.util.ConfigurationUtils;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

@PreprocessingStepDescription(name="Determine process variables", description="Determines all process variables.")
public class DetermineProcessVariablesStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        dataset = this.doFilterVariables(dataset, config.isWriteStepResultsIntoFile(), config);
        dataset = this.doVariableNameMapping(dataset, config.isWriteStepResultsIntoFile(), config);
        dataset = this.doVariableTypeDetermination(dataset, config.isWriteStepResultsIntoFile(), config);
        dataset = this.doVariableTypeEscalation(dataset, config);
        return dataset;
    }

    private Dataset<Row> doFilterVariables(Dataset<Row> dataset, boolean writeStepResultIntoFile, SparkRunnerConfig config) {
        PreprocessingConfiguration preprocessingConfiguration;
        ArrayList<String> variablesToFilter = new ArrayList<String>();
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
        if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null) {
            for (VariableConfiguration vc : preprocessingConfiguration.getVariableConfiguration()) {
                if (vc.isUseVariable()) continue;
                variablesToFilter.add(vc.getVariableName());
                BpmnaiLogger.getInstance().writeInfo("The variable '" + vc.getVariableName() + "' will be filtered out. Comment: " + vc.getComment());
            }
        }
        List existingVariablesRows = dataset.select("name_", new String[0]).distinct().collectAsList();
        final List existingVariables = existingVariablesRows.stream().map(r -> r.getString(0)).collect(Collectors.toList());
        variablesToFilter.stream().forEach(new Consumer<String>(){

            @Override
            public void accept(String s) {
                if (!existingVariables.contains(s)) {
                    BpmnaiLogger.getInstance().writeWarn("The variable '" + s + "' is configured to be filtered, but does not exist in the data.");
                }
            }
        });
        dataset = dataset.filter((FilterFunction & Serializable)row -> {
            boolean keep;
            String variable = (String)row.getAs("name_");
            boolean bl = keep = !variablesToFilter.contains(variable);
            if (variable != null && variable.startsWith("_CORRELATION_ID_")) {
                keep = false;
            }
            return keep;
        });
        if (writeStepResultIntoFile) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "variable_filter", config);
        }
        return dataset;
    }

    private Dataset<Row> doVariableNameMapping(Dataset<Row> dataset, boolean writeStepResultIntoFile, SparkRunnerConfig config) {
        PreprocessingConfiguration preprocessingConfiguration;
        HashMap<String, String> variableNameMappings = new HashMap<String, String>();
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
        if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null) {
            for (VariableNameMapping vm : preprocessingConfiguration.getVariableNameMappings()) {
                if (!vm.getOldName().equals("") && !vm.getNewName().equals("")) {
                    variableNameMappings.put(vm.getOldName(), vm.getNewName());
                    continue;
                }
                BpmnaiLogger.getInstance().writeWarn("Ignoring variable name mapping '" + vm.getOldName() + "' -> '" + vm.getNewName() + "'.");
            }
        }
        for (String oldName : variableNameMappings.keySet()) {
            String newName = (String)variableNameMappings.get(oldName);
            BpmnaiLogger.getInstance().writeInfo("Renaming variable '" + oldName + "' to '" + newName + "' as per user configuration.");
            dataset = dataset.withColumn("name_", functions.when((Column)dataset.col("name_").equalTo((Object)oldName), (Object)functions.lit((Object)newName)).otherwise((Object)dataset.col("name_")));
        }
        if (writeStepResultIntoFile) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(dataset, "variable_name_mapping", config);
        }
        return dataset;
    }

    private Dataset<Row> doVariableTypeDetermination(Dataset<Row> dataset, boolean writeStepResultIntoFile, SparkRunnerConfig config) {
        Dataset variablesTypesDataset = dataset.select("name_", new String[]{"var_type_", "rev_"}).groupBy("name_", new String[]{"var_type_"}).agg(functions.max((String)"rev_").alias("rev_"), new Column[0]).filter("name_ <> 'null'");
        HashMap<String, String> variablesAndTypes = new HashMap<String, String>();
        Iterator it = variablesTypesDataset.toLocalIterator();
        while (it.hasNext()) {
            Row row = (Row)it.next();
            String name = row.getString(0);
            String type = row.getString(1);
            if (type == null) {
                type = "string";
            }
            variablesAndTypes.put(name, type);
        }
        SparkBroadcastHelper.getInstance().broadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_RAW, variablesAndTypes);
        if (writeStepResultIntoFile) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)variablesTypesDataset, "variables_types_help", config);
        }
        return dataset;
    }

    private Dataset<Row> doVariableTypeEscalation(Dataset<Row> dataset, SparkRunnerConfig config) {
        Map variables = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_RAW);
        Object lastVariableName = "";
        String lastVariableType = "";
        int lastVariableMaxRevision = 0;
        int variableOccurences = 0;
        for (Object variable : variables.keySet()) {
            String type = (String)variables.get(variable);
            int revision = 0;
            this.processVariable(variables, (String)variable, type, revision, (String)lastVariableName, lastVariableType, lastVariableMaxRevision, variableOccurences);
            if (((String)variable).equals(lastVariableName)) continue;
            lastVariableName = variable;
            lastVariableType = type;
            lastVariableMaxRevision = revision;
            variableOccurences = 1;
        }
        this.processVariable(variables, "", "", 0, (String)lastVariableName, lastVariableType, lastVariableMaxRevision, variableOccurences);
        SparkBroadcastHelper.getInstance().broadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED, variables);
        ArrayList<Row> filteredVariablesRows = new ArrayList<Row>();
        for (Object key : variables.keySet()) {
            filteredVariablesRows.add(RowFactory.create((Object[])new Object[]{key, variables.get(key)}));
        }
        if (config.isInitialConfigToBeWritten()) {
            Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
            for (String name : variables.keySet()) {
                String type = (String)variables.get(name);
                VariableConfiguration variableConfiguration = new VariableConfiguration();
                variableConfiguration.setVariableName(name);
                variableConfiguration.setVariableType(type);
                variableConfiguration.setUseVariable(true);
                variableConfiguration.setComment("");
                configuration.getPreprocessingConfiguration().getVariableConfiguration().add(variableConfiguration);
            }
        }
        StructType schema = new StructType(new StructField[]{new StructField("name_", DataTypes.StringType, false, Metadata.empty()), new StructField("var_type_", DataTypes.StringType, false, Metadata.empty())});
        SparkSession sparkSession = SparkSession.builder().getOrCreate();
        Dataset helpDataSet = sparkSession.createDataFrame(filteredVariablesRows, schema).toDF().orderBy("name_", new String[0]);
        dataset.cache();
        helpDataSet.cache();
        BpmnaiLogger.getInstance().writeInfo("Found " + helpDataSet.count() + " process variables.");
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)helpDataSet, "variable_types_escalated", config);
        }
        return dataset;
    }

    private void processVariable(Map<String, String> variables, String variableName, String variableType, int revision, String lastVariableName, String lastVariableType, int lastVariableMaxRevision, int variableOccurences) {
        if (variableName.equals(lastVariableName)) {
            ++variableOccurences;
            if (lastVariableType.equals("null") || lastVariableType.equals("")) {
                lastVariableType = variableType;
            } else if (!variableType.equals("null") && !variableType.equals("")) {
                lastVariableType = variableType;
            }
            lastVariableMaxRevision = Math.max(revision, lastVariableMaxRevision);
        } else if (variableOccurences == 1) {
            if (lastVariableType.equals("null") || lastVariableType.equals("")) {
                variables.put(lastVariableName, "string");
            } else {
                variables.put(lastVariableName, lastVariableType);
            }
        } else if (variableOccurences > 1) {
            variables.put(lastVariableName, lastVariableType);
        }
    }
}

