/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.VariableConfiguration;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkBroadcastHelper;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CreateColumnsFromJsonStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameterss) {
        Map varMap = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED);
        String[] vars = null;
        if (SparkImporterVariables.getPipelineMode().equals("learn")) {
            Set variables = varMap.keySet();
            vars = new String[variables.size()];
            int vc = 0;
            for (String v : variables) {
                vars[vc++] = v;
            }
        }
        String[] finalVars = vars;
        String[] columns = dataset.columns();
        StructType schema = dataset.schema();
        StructType newColumnsSchema = new StructType().add("column", DataTypes.StringType);
        Dataset newColumnsDataset = dataset.flatMap((FlatMapFunction & Serializable)row -> {
            ArrayList<Row> newColumns = new ArrayList<Row>();
            for (String c : columns) {
                if (!SparkImporterVariables.getPipelineMode().equals("predict") && !Arrays.asList(finalVars).contains(c)) continue;
                ObjectMapper mapper = new ObjectMapper();
                JsonFactory factory = mapper.getFactory();
                JsonParser parser = null;
                JsonNode jsonParsed = null;
                try {
                    parser = factory.createParser((String)row.getAs(c));
                    jsonParsed = (JsonNode)mapper.readTree(parser);
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                if (jsonParsed == null || !jsonParsed.fieldNames().hasNext()) continue;
                Iterator fieldNames = jsonParsed.fieldNames();
                while (fieldNames.hasNext()) {
                    String fieldName = (String)fieldNames.next();
                    JsonNode value = jsonParsed.get(fieldName);
                    if (value.isObject() || value.isArray()) continue;
                    String columnName = c + "_" + fieldName;
                    newColumns.add(RowFactory.create((Object[])new Object[]{columnName}));
                }
            }
            return newColumns.iterator();
        }, (Encoder)RowEncoder.apply((StructType)newColumnsSchema));
        newColumnsDataset = newColumnsDataset.select(new Column[]{newColumnsDataset.col("column")}).distinct();
        List<Row> newColumnsAsRow = Arrays.asList((Row[])newColumnsDataset.select(new Column[]{newColumnsDataset.col("column")}).collect());
        ArrayList<String> newColumns = new ArrayList<String>();
        StructType newSchema = schema;
        for (Row newColumnRow : newColumnsAsRow) {
            newColumns.add(newColumnRow.getString(0));
            newSchema = newSchema.add(newColumnRow.getString(0), DataTypes.StringType);
        }
        StructType newSchema1 = newSchema;
        dataset = dataset.map((MapFunction & Serializable)row -> {
            ArrayList newRowStrings = new ArrayList();
            HashMap<String, String> newColumnValues = new HashMap<String, String>();
            for (String c : columns) {
                String columnValue = null;
                if (SparkImporterVariables.getPipelineMode().equals("predict") || Arrays.asList(finalVars).contains(c)) {
                    ObjectMapper mapper = new ObjectMapper();
                    JsonFactory factory = mapper.getFactory();
                    JsonParser parser = null;
                    JsonNode jsonParsed = null;
                    try {
                        parser = factory.createParser((String)row.getAs(c));
                        jsonParsed = (JsonNode)mapper.readTree(parser);
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                    columnValue = (String)row.getAs(c);
                    newColumnValues.put(c, columnValue);
                    if (jsonParsed == null || !jsonParsed.fieldNames().hasNext()) continue;
                    columnValue = (String)row.getAs(c);
                    newColumnValues.put(c, columnValue);
                    Iterator fieldNames = jsonParsed.fieldNames();
                    while (fieldNames.hasNext()) {
                        String fieldName = (String)fieldNames.next();
                        JsonNode value = jsonParsed.get(fieldName);
                        if (value.isObject() || value.isArray()) continue;
                        String columnName = c + "_" + fieldName;
                        if (newColumns.contains(columnName)) {
                            newColumnValues.put(columnName, value.asText());
                            continue;
                        }
                        SparkImporterLogger.getInstance().writeError("Found column in json not found in step before: " + columnName);
                    }
                    continue;
                }
                columnValue = (String)row.getAs(c);
                newColumnValues.put(c, columnValue);
            }
            for (String f : newSchema1.fieldNames()) {
                newRowStrings.add(newColumnValues.get(f));
            }
            return RowFactory.create((Object[])newRowStrings.toArray());
        }, (Encoder)RowEncoder.apply((StructType)newSchema1));
        if (SparkImporterVariables.getPipelineMode().equals("learn")) {
            ArrayList<Row> filteredVariablesRows = new ArrayList<Row>();
            for (String name : newColumns) {
                String type = "string";
                filteredVariablesRows.add(RowFactory.create((Object[])new Object[]{name, type}));
                if (!PreprocessingRunner.initialConfigToBeWritten) continue;
                Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
                VariableConfiguration variableConfiguration = new VariableConfiguration();
                variableConfiguration.setVariableName(name);
                variableConfiguration.setVariableType(type);
                variableConfiguration.setUseVariable(true);
                variableConfiguration.setComment("");
                configuration.getPreprocessingConfiguration().getVariableConfiguration().add(variableConfiguration);
            }
            StructType schemaVars = new StructType(new StructField[]{new StructField("name_", DataTypes.StringType, false, Metadata.empty()), new StructField("var_type_", DataTypes.StringType, false, Metadata.empty())});
            SparkImporterLogger.getInstance().writeInfo("Found " + newColumns.size() + " additional process variables during Json processing.");
            SparkSession sparkSession = SparkSession.builder().getOrCreate();
            Dataset helpDataSet = sparkSession.createDataFrame(filteredVariablesRows, schemaVars).toDF().orderBy("name_", new String[0]);
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)helpDataSet, "variable_types_after_json_escalated");
            if (writeStepResultIntoFile) {
                SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "create_columns_from_json");
            }
        }
        return dataset;
    }
}

