/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.configuration.Configuration;
import de.viadee.bpmnai.core.configuration.preprocessing.ColumnConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.VariableConfiguration;
import de.viadee.bpmnai.core.configuration.util.ConfigurationUtils;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

@PreprocessingStepDescription(name="Type cast", description="In this step the columns are casted into the data type they have been defined in the configuration. If the cast could not be done by Spark the value is null afterwards.")
public class TypeCastStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        Map varMap = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED);
        List<StructField> datasetFields = Arrays.asList(dataset.schema().fields());
        List<ColumnConfiguration> columnConfigurations = null;
        List<VariableConfiguration> variableConfigurations = null;
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
        if (configuration != null) {
            PreprocessingConfiguration preprocessingConfiguration = configuration.getPreprocessingConfiguration();
            columnConfigurations = preprocessingConfiguration.getColumnConfiguration();
            variableConfigurations = preprocessingConfiguration.getVariableConfiguration();
        }
        HashMap<String, ColumnConfiguration> columnTypeConfigMap = new HashMap<String, ColumnConfiguration>();
        HashMap<String, VariableConfiguration> variableTypeConfigMap = new HashMap<String, VariableConfiguration>();
        if (columnConfigurations != null) {
            for (ColumnConfiguration cc : columnConfigurations) {
                columnTypeConfigMap.put(cc.getColumnName(), cc);
            }
        }
        if (variableConfigurations != null) {
            for (VariableConfiguration vc : variableConfigurations) {
                variableTypeConfigMap.put(vc.getVariableName(), vc);
            }
        }
        for (String column : dataset.columns()) {
            if (column.endsWith("_rev")) continue;
            DataType newDataType = null;
            boolean isVariableColumn = false;
            String configurationDataType = null;
            String configurationParseFormat = null;
            if (variableTypeConfigMap.keySet().contains(column)) {
                configurationDataType = ((VariableConfiguration)variableTypeConfigMap.get(column)).getVariableType();
                configurationParseFormat = ((VariableConfiguration)variableTypeConfigMap.get(column)).getParseFormat();
                isVariableColumn = config.getPipelineMode().equals("learn") ? varMap.keySet().contains(column) : true;
            } else if (columnTypeConfigMap.keySet().contains(column)) {
                configurationDataType = ((ColumnConfiguration)columnTypeConfigMap.get(column)).getColumnType();
                configurationParseFormat = ((ColumnConfiguration)columnTypeConfigMap.get(column)).getParseFormat();
            }
            newDataType = this.mapDataType(datasetFields, column, configurationDataType);
            if (config.isDevTypeCastCheckEnabled() && !newDataType.equals(this.getCurrentDataType(datasetFields, column))) {
                dataset = this.castColumn((Dataset<Row>)dataset, column, column + "_casted", newDataType, configurationParseFormat);
                dataset = dataset.withColumn(column + "_castresult", functions.when((Column)dataset.col(column).isNotNull().and(dataset.col(column).notEqual((Object)functions.lit((Object)""))), (Object)functions.when((Column)dataset.col(column + "_casted").isNull(), (Object)functions.lit((Object)"CAST_ERROR?")).otherwise((Object)functions.lit((Object)""))).otherwise((Object)functions.lit((Object)"")));
                dataset.cache();
                if (dataset.filter(column + "_castresult == 'CAST_ERROR?'").count() > 0L) {
                    BpmnaiLogger.getInstance().writeWarn("Column '" + column + "' seems to have cast errors. Please check the data type (is defined as '" + configurationDataType + "')");
                } else {
                    dataset = dataset.drop(new String[]{column, column + "_castresult"}).withColumnRenamed(column + "_casted", column);
                }
            } else {
                dataset = this.castColumn(dataset, column, column, newDataType, configurationParseFormat);
            }
            if (!config.getDataLevel().equals("process") || !config.isRevCountEnabled() || !isVariableColumn) continue;
            dataset = dataset.withColumn(column + "_rev", dataset.col(column + "_rev").cast("integer"));
        }
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV(dataset, "type_cast_columns", config);
        }
        return dataset;
    }

    private Dataset castColumn(Dataset<Row> dataset, String columnToCast, String castColumnName, DataType newDataType, String parseFormat) {
        Dataset newDataset = dataset;
        newDataset = newDataType.equals(DataTypes.DateType) ? (parseFormat != null && !parseFormat.equals("") ? dataset.withColumn(castColumnName, functions.when((Column)functions.callUDF((String)"isalong", (Column[])new Column[]{dataset.col(columnToCast)}), (Object)functions.to_date((Column)functions.from_unixtime((Column)functions.callUDF((String)"timestampstringtolong", (Column[])new Column[]{dataset.col(columnToCast)})), (String)parseFormat)).otherwise((Object)functions.to_date((Column)dataset.col(columnToCast), (String)parseFormat))) : dataset.withColumn(castColumnName, functions.when((Column)functions.callUDF((String)"isalong", (Column[])new Column[]{dataset.col(columnToCast)}), (Object)functions.to_date((Column)functions.from_unixtime((Column)functions.callUDF((String)"timestampstringtolong", (Column[])new Column[]{dataset.col(columnToCast)})))).otherwise((Object)functions.to_date((Column)dataset.col(columnToCast))))) : (newDataType.equals(DataTypes.TimestampType) ? (parseFormat != null && !parseFormat.equals("") ? dataset.withColumn(castColumnName, functions.when((Column)functions.callUDF((String)"isalong", (Column[])new Column[]{dataset.col(columnToCast)}), (Object)functions.to_timestamp((Column)functions.from_unixtime((Column)functions.callUDF((String)"timestampstringtolong", (Column[])new Column[]{dataset.col(columnToCast)})), (String)parseFormat)).otherwise((Object)functions.to_timestamp((Column)dataset.col(columnToCast), (String)parseFormat))) : dataset.withColumn(castColumnName, functions.when((Column)functions.callUDF((String)"isalong", (Column[])new Column[]{dataset.col(columnToCast)}), (Object)functions.to_timestamp((Column)functions.from_unixtime((Column)functions.callUDF((String)"timestampstringtolong", (Column[])new Column[]{dataset.col(columnToCast)})))).otherwise((Object)functions.to_timestamp((Column)dataset.col(columnToCast))))) : dataset.withColumn(castColumnName, dataset.col(columnToCast).cast(newDataType)));
        return newDataset;
    }

    private DataType getCurrentDataType(List<StructField> datasetFields, String column) {
        for (StructField sf : datasetFields) {
            if (!sf.name().equals(column)) continue;
            return sf.dataType();
        }
        return null;
    }

    private DataType mapDataType(List<StructField> datasetFields, String column, String typeConfig) {
        DataType currentDatatype = this.getCurrentDataType(datasetFields, column);
        if (typeConfig == null) {
            return currentDatatype;
        }
        switch (typeConfig) {
            case "integer": {
                return DataTypes.IntegerType;
            }
            case "long": {
                return DataTypes.LongType;
            }
            case "double": {
                return DataTypes.DoubleType;
            }
            case "boolean": {
                return DataTypes.BooleanType;
            }
            case "date": {
                return DataTypes.DateType;
            }
            case "timestamp": {
                return DataTypes.TimestampType;
            }
        }
        return DataTypes.StringType;
    }
}

