/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.configuration.Configuration;
import de.viadee.bpmnai.core.configuration.preprocessing.ColumnConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.bpmnai.core.configuration.util.ConfigurationUtils;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

@PreprocessingStepDescription(name="Remove column", description="In this step input columns are removed accoording to the configuration before the processing to the data is done.")
public class ColumnRemoveStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataSet, Map<String, Object> parameters, SparkRunnerConfig config) {
        PreprocessingConfiguration preprocessingConfiguration;
        ArrayList<String> columnsToKeep = new ArrayList<String>();
        columnsToKeep.add("proc_inst_id_");
        columnsToKeep.add("name_");
        columnsToKeep.add("var_type_");
        columnsToKeep.add("rev_");
        columnsToKeep.add("state_");
        columnsToKeep.add("long_");
        columnsToKeep.add("double_");
        columnsToKeep.add("text_");
        columnsToKeep.add("text2_");
        columnsToKeep.add("source_");
        ArrayList<String> columnsToRemove = new ArrayList<String>();
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(config);
        if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null) {
            for (ColumnConfiguration cc : preprocessingConfiguration.getColumnConfiguration()) {
                if (cc.isUseColumn()) continue;
                if (columnsToKeep.contains(cc.getColumnName())) {
                    BpmnaiLogger.getInstance().writeWarn("The column '" + cc.getColumnName() + "' has to stay in in order to do the processing. It will not be removed. Comment: " + cc.getComment());
                    continue;
                }
                columnsToRemove.add(cc.getColumnName());
                BpmnaiLogger.getInstance().writeInfo("The column '" + cc.getColumnName() + "' will be removed. Comment: " + cc.getComment());
            }
        }
        final ArrayList<String> existingColumns = new ArrayList<String>(Arrays.asList(dataSet.columns()));
        columnsToRemove.stream().forEach(new Consumer<String>(){

            @Override
            public void accept(String s) {
                if (!existingColumns.contains(s)) {
                    BpmnaiLogger.getInstance().writeWarn("The column '" + s + "' is configured to be filtered, but does not exist in the data.");
                }
            }
        });
        dataSet = dataSet.drop(BpmnaiUtils.getInstance().asSeq(columnsToRemove));
        return dataSet;
    }
}

