/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.ColumnConfiguration;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.Seq;

public class ReduceColumnsDatasetStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameters) {
        List<StructField> datasetFields = Arrays.asList(dataset.schema().fields());
        List<String> startColumnsString = Arrays.asList(dataset.columns());
        ArrayList<String> columnsToKeep = new ArrayList<String>();
        columnsToKeep.add("proc_inst_id_");
        columnsToKeep.add("name_");
        columnsToKeep.add("var_type_");
        columnsToKeep.add("rev_");
        columnsToKeep.add("state_");
        columnsToKeep.add("long_");
        columnsToKeep.add("double_");
        columnsToKeep.add("text_");
        columnsToKeep.add("text2_");
        if (dataLevel.equals("activity")) {
            columnsToKeep.add("act_inst_id_");
            columnsToKeep.add("start_time_");
            columnsToKeep.add("end_time_");
            columnsToKeep.add("duration_");
        }
        if (PreprocessingRunner.initialConfigToBeWritten) {
            Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
            for (String column : startColumnsString) {
                if (columnsToKeep.contains(column)) continue;
                ColumnConfiguration columnConfiguration = new ColumnConfiguration();
                columnConfiguration.setColumnName(column);
                columnConfiguration.setColumnType(this.getColumnTypeString(datasetFields, column));
                columnConfiguration.setUseColumn(true);
                columnConfiguration.setComment("");
                configuration.getPreprocessingConfiguration().getColumnConfiguration().add(columnConfiguration);
            }
        }
        ArrayList<Row> startColumns = new ArrayList<Row>();
        for (String column : startColumnsString) {
            startColumns.add(RowFactory.create((Object[])new Object[]{column}));
        }
        StructType schema = new StructType(new StructField[]{new StructField("column_name", DataTypes.StringType, false, Metadata.empty())});
        SparkSession sparkSession = SparkSession.builder().getOrCreate();
        Dataset startColumnsDataset = sparkSession.createDataFrame(startColumns, schema).toDF();
        PreprocessingRunner.helper_datasets.put("startColumns_" + dataLevel, (Dataset<Row>)startColumnsDataset);
        ArrayList<Column> columns = new ArrayList<Column>();
        columns.add(new Column("proc_inst_id_"));
        columns.add(new Column("name_"));
        columns.add(new Column("var_type_"));
        columns.add(new Column("rev_"));
        columns.add(new Column("state_"));
        columns.add(new Column("long_"));
        columns.add(new Column("double_"));
        columns.add(new Column("text_"));
        columns.add(new Column("text2_"));
        if (dataLevel.equals("activity")) {
            columns.add(new Column("act_inst_id_"));
            columns.add(new Column("start_time_"));
            columns.add(new Column("end_time_"));
            columns.add(new Column("duration_"));
        }
        if (Arrays.asList(dataset.columns()).contains("timestamp_")) {
            columns.add(new Column("timestamp_"));
        }
        Seq selectionColumns = SparkImporterUtils.getInstance().asSeq(columns);
        dataset = dataset.select(selectionColumns).filter("proc_inst_id_ <> 'null'");
        if (writeStepResultIntoFile) {
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "reduced_columns");
        }
        return dataset;
    }

    private String getColumnTypeString(List<StructField> datasetFields, String column) {
        DataType currentDatatype = DataTypes.StringType;
        for (StructField sf : datasetFields) {
            if (!sf.name().equals(column)) continue;
            currentDatatype = sf.dataType();
            break;
        }
        if (currentDatatype.equals(DataTypes.IntegerType)) {
            return "integer";
        }
        if (currentDatatype.equals(DataTypes.LongType)) {
            return "long";
        }
        if (currentDatatype.equals(DataTypes.DoubleType)) {
            return "double";
        }
        if (currentDatatype.equals(DataTypes.BooleanType)) {
            return "boolean";
        }
        if (currentDatatype.equals(DataTypes.TimestampType)) {
            return "timestamp";
        }
        if (currentDatatype.equals(DataTypes.DateType)) {
            return "date";
        }
        if (currentDatatype.equals(DataTypes.FloatType)) {
            return "float";
        }
        return "string";
    }
}

