/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.configuration.Configuration;
import de.viadee.bpmnai.core.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.VariableConfiguration;
import de.viadee.bpmnai.core.configuration.util.ConfigurationUtils;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

@PreprocessingStepDescription(name="Create columns from Json", description="In this step each variable column is checked if it contains a json and if so, the first level of attributes is transformed into separate columns. No object or array parameters are converted.")
public class CreateColumnsFromJsonStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        dataset = this.doCreateColumnsFromJson(dataset, config);
        dataset = this.doFilterJsonVariables(dataset, config);
        return dataset;
    }

    private Dataset<Row> doCreateColumnsFromJson(Dataset<Row> dataset, SparkRunnerConfig config) {
        HashMap varMap = (HashMap)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED);
        if (varMap == null) {
            varMap = new HashMap();
        }
        String[] vars = null;
        if (config.getPipelineMode().equals("learn")) {
            Set variables = varMap.keySet();
            vars = new String[variables.size()];
            int vc = 0;
            for (String v : variables) {
                vars[vc++] = v;
            }
        }
        String[] finalVars = vars;
        String[] columns = dataset.columns();
        StructType schema = dataset.schema();
        StructType newColumnsSchema = new StructType().add("column", DataTypes.StringType);
        Dataset newColumnsDataset = dataset.flatMap((FlatMapFunction & Serializable)row -> {
            ArrayList<Row> newColumns = new ArrayList<Row>();
            for (String c : columns) {
                if (!config.getPipelineMode().equals("predict") && !Arrays.asList(finalVars).contains(c)) continue;
                ObjectMapper mapper = new ObjectMapper();
                JsonFactory factory = mapper.getFactory();
                JsonParser parser = null;
                JsonNode jsonParsed = null;
                try {
                    String varColumn = (String)row.getAs(c);
                    if (varColumn != null) {
                        parser = factory.createParser(varColumn);
                        jsonParsed = (JsonNode)mapper.readTree(parser);
                    }
                }
                catch (IOException varColumn) {
                    // empty catch block
                }
                if (jsonParsed == null || !jsonParsed.fieldNames().hasNext()) continue;
                Iterator fieldNames = jsonParsed.fieldNames();
                while (fieldNames.hasNext()) {
                    String fieldName = (String)fieldNames.next();
                    JsonNode value = jsonParsed.get(fieldName);
                    if (value.isObject() || value.isArray()) continue;
                    String columnName = c + "_" + fieldName;
                    newColumns.add(RowFactory.create((Object[])new Object[]{columnName}));
                }
            }
            return newColumns.iterator();
        }, (Encoder)RowEncoder.apply((StructType)newColumnsSchema));
        newColumnsDataset = newColumnsDataset.select(new Column[]{newColumnsDataset.col("column")}).distinct();
        List<Row> newColumnsAsRow = Arrays.asList((Row[])newColumnsDataset.select(new Column[]{newColumnsDataset.col("column")}).collect());
        ArrayList<String> newColumns = new ArrayList<String>();
        StructType newSchema = schema;
        for (Row newColumnRow : newColumnsAsRow) {
            newColumns.add(newColumnRow.getString(0));
            newSchema = newSchema.add(newColumnRow.getString(0), DataTypes.StringType);
        }
        StructType newSchema1 = newSchema;
        dataset = dataset.map((MapFunction & Serializable)row -> {
            ArrayList newRowStrings = new ArrayList();
            HashMap<String, String> newColumnValues = new HashMap<String, String>();
            for (String c : columns) {
                String columnValue = null;
                if (config.getPipelineMode().equals("predict") || Arrays.asList(finalVars).contains(c)) {
                    ObjectMapper mapper = new ObjectMapper();
                    JsonFactory factory = mapper.getFactory();
                    JsonParser parser = null;
                    JsonNode jsonParsed = null;
                    try {
                        if (row.getAs(c) != null) {
                            parser = factory.createParser((String)row.getAs(c));
                            jsonParsed = (JsonNode)mapper.readTree(parser);
                        }
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                    columnValue = (String)row.getAs(c);
                    newColumnValues.put(c, columnValue);
                    if (jsonParsed == null || !jsonParsed.fieldNames().hasNext()) continue;
                    columnValue = (String)row.getAs(c);
                    newColumnValues.put(c, columnValue);
                    Iterator fieldNames = jsonParsed.fieldNames();
                    while (fieldNames.hasNext()) {
                        String fieldName = (String)fieldNames.next();
                        JsonNode value = jsonParsed.get(fieldName);
                        if (value.isObject() || value.isArray()) continue;
                        String columnName = c + "_" + fieldName;
                        if (newColumns.contains(columnName)) {
                            newColumnValues.put(columnName, value.asText());
                            continue;
                        }
                        BpmnaiLogger.getInstance().writeError("Found column in json not found in step before: " + columnName);
                    }
                    continue;
                }
                columnValue = (String)row.getAs(c);
                newColumnValues.put(c, columnValue);
            }
            for (String f : newSchema1.fieldNames()) {
                newRowStrings.add(newColumnValues.get(f));
            }
            return RowFactory.create((Object[])newRowStrings.toArray());
        }, (Encoder)RowEncoder.apply((StructType)newSchema1));
        if (config.getPipelineMode().equals("learn")) {
            ArrayList<Row> filteredVariablesRows = new ArrayList<Row>();
            for (String name : newColumns) {
                String type = "string";
                filteredVariablesRows.add(RowFactory.create((Object[])new Object[]{name, type}));
                if (!config.isInitialConfigToBeWritten()) continue;
                Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
                VariableConfiguration variableConfiguration = new VariableConfiguration();
                variableConfiguration.setVariableName(name);
                variableConfiguration.setVariableType(type);
                variableConfiguration.setUseVariable(true);
                variableConfiguration.setComment("");
                configuration.getPreprocessingConfiguration().getVariableConfiguration().add(variableConfiguration);
            }
            StructType schemaVars = new StructType(new StructField[]{new StructField("name_", DataTypes.StringType, false, Metadata.empty()), new StructField("var_type_", DataTypes.StringType, false, Metadata.empty())});
            BpmnaiLogger.getInstance().writeInfo("Found " + newColumns.size() + " additional process variables during Json processing.");
            SparkSession sparkSession = SparkSession.builder().getOrCreate();
            Dataset helpDataSet = sparkSession.createDataFrame(filteredVariablesRows, schemaVars).toDF().orderBy("name_", new String[0]);
            if (config.isWriteStepResultsIntoFile()) {
                BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)helpDataSet, "variable_types_after_json_escalated", config);
                BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "create_columns_from_json", config);
            }
        }
        return dataset;
    }

    private Dataset<Row> doFilterJsonVariables(Dataset<Row> dataset, SparkRunnerConfig config) {
        PreprocessingConfiguration preprocessingConfiguration;
        ArrayList<String> variablesToFilter = new ArrayList<String>();
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
        if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null) {
            for (VariableConfiguration vc : preprocessingConfiguration.getVariableConfiguration()) {
                if (vc.isUseVariable()) continue;
                variablesToFilter.add(vc.getVariableName());
                if (!Arrays.asList(dataset.columns()).contains(vc.getVariableName())) continue;
                BpmnaiLogger.getInstance().writeInfo("The variable '" + vc.getVariableName() + "' will be filtered out after json processing. Comment: " + vc.getComment());
            }
        }
        dataset = dataset.drop(BpmnaiUtils.getInstance().asSeq(variablesToFilter));
        return dataset;
    }
}

