/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.processing.steps.userconfig;

import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.VariableConfiguration;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class VariableFilterStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataSet, boolean writeStepResultIntoFile, String dataLevel, Map<String, Object> parameters) {
        PreprocessingConfiguration preprocessingConfiguration;
        ArrayList<String> variablesToFilter = new ArrayList<String>();
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null) {
            for (VariableConfiguration vc : preprocessingConfiguration.getVariableConfiguration()) {
                if (vc.isUseVariable()) continue;
                variablesToFilter.add(vc.getVariableName());
                SparkImporterLogger.getInstance().writeInfo("The variable '" + vc.getVariableName() + "' will be filtered out. Comment: " + vc.getComment());
            }
        }
        List existingVariablesRows = dataSet.select("name_", new String[0]).distinct().collectAsList();
        final List existingVariables = existingVariablesRows.stream().map(r -> r.getString(0)).collect(Collectors.toList());
        variablesToFilter.stream().forEach(new Consumer<String>(){

            @Override
            public void accept(String s) {
                if (!existingVariables.contains(s)) {
                    SparkImporterLogger.getInstance().writeWarn("The variable '" + s + "' is configured to be filtered, but does not exist in the data.");
                }
            }
        });
        dataSet = dataSet.filter((FilterFunction & Serializable)row -> {
            boolean keep;
            String variable = (String)row.getAs("name_");
            boolean bl = keep = !variablesToFilter.contains(variable);
            if (variable != null && variable.startsWith("_CORRELATION_ID_")) {
                keep = false;
            }
            return keep;
        });
        if (writeStepResultIntoFile) {
            SparkImporterUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataSet, "variable_filter");
        }
        return dataSet;
    }
}

