/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.userconfig;

import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

public class DataFilterOnActivityStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataSet, Map<String, Object> parameters, SparkRunnerConfig config) {
        if (parameters == null || parameters.size() == 0) {
            BpmnaiLogger.getInstance().writeWarn("No parameters found for the DataFilterOnActivityStep");
            return dataSet;
        }
        String query = (String)parameters.get("query");
        BpmnaiLogger.getInstance().writeInfo("Filtering data with activity instance filter query: " + query + ".");
        dataSet.cache();
        Long initialDSCount = dataSet.count();
        dataSet = dataSet.repartition(new Column[]{dataSet.col("proc_inst_id_")}).sortWithinPartitions("start_time_", new String[0]);
        Dataset variables = dataSet.filter(functions.col((String)"var_type_").isNotNull());
        Dataset dsTmp = dataSet.filter(dataSet.col("activity_id_").equalTo((Object)query)).filter(dataSet.col("end_time_").isNull());
        Dataset dsActivityInstances = dataSet.filter(dataSet.col("activity_id_").like(query)).filter(dataSet.col("end_time_").isNull());
        List activityRows = dsActivityInstances.select("proc_inst_id_", new String[]{"start_time_"}).collectAsList();
        Map<String, String> activities = activityRows.stream().collect(Collectors.toMap(r -> (String)r.getAs("proc_inst_id_"), r -> (String)r.getAs("start_time_")));
        SparkBroadcastHelper.getInstance().broadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP, activities);
        Dataset selectedProcesses = dataSet.filter(functions.col((String)"proc_inst_id_").isin(activities.keySet().toArray()));
        Dataset activityDataSet = selectedProcesses.withColumn("data_filter_on_activity", functions.callUDF((String)"activityBeforeTimestamp", (Column[])new Column[]{selectedProcesses.col("proc_inst_id_"), selectedProcesses.col("start_time_")}));
        activityDataSet = activityDataSet.filter(functions.col((String)"data_filter_on_activity").like("TRUE"));
        activityDataSet = activityDataSet.drop("data_filter_on_activity");
        activityDataSet = activityDataSet.withColumnRenamed("act_inst_id_", "act_inst_id__RIGHT");
        variables = variables.join(activityDataSet.select("act_inst_id__RIGHT", new String[0]).distinct(), variables.col("act_inst_id_").equalTo((Object)activityDataSet.col("act_inst_id__RIGHT")), "inner");
        activityDataSet = activityDataSet.withColumnRenamed("act_inst_id__RIGHT", "act_inst_id_");
        variables = variables.drop("act_inst_id__RIGHT");
        dataSet = activityDataSet.union(variables);
        dataSet.cache();
        BpmnaiLogger.getInstance().writeInfo("DataFilterOnActivityStep: The filtered DataSet contains " + dataSet.count() + " rows, (before: " + initialDSCount + " rows)");
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataSet, "data_filter_on_activity_step", config);
        }
        return dataSet;
    }
}

