/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.processing.steps.dataprocessing;

import de.viadee.bpmnai.core.annotation.PreprocessingStepDescription;
import de.viadee.bpmnai.core.processing.interfaces.PreprocessingStepInterface;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;

@PreprocessingStepDescription(name="Fill activity instances history", description="In this step each variable column is filled with values according to the history of the process instance up to the point of activity activity represented in the line.")
public class FillActivityInstancesHistoryStep
implements PreprocessingStepInterface {
    @Override
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, Map<String, Object> parameters, SparkRunnerConfig config) {
        Map varMap = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED);
        Set variables = varMap.keySet();
        String[] vars = new String[variables.size()];
        int vc = 0;
        for (String v : variables) {
            vars[vc++] = v;
        }
        for (String v : variables) {
            if (!Arrays.asList(dataset.columns()).contains(v)) continue;
            dataset = dataset.withColumn(v, functions.when((Column)dataset.col(v).equalTo((Object)""), null).otherwise((Object)dataset.col(v)));
        }
        HashMap valuesToWrite = new HashMap();
        String[] lastProcessInstanceId = new String[]{""};
        String[] columns = dataset.columns();
        dataset = dataset.repartition(new Column[]{dataset.col("proc_inst_id_")}).sortWithinPartitions("start_time_", new String[0]);
        dataset = dataset.map((MapFunction & Serializable)row -> {
            String currentProcessInstanceId = (String)row.getAs("proc_inst_id_");
            Object[] newRow = new String[columns.length];
            if (!lastProcessInstanceId[0].equals(currentProcessInstanceId)) {
                valuesToWrite.clear();
                lastProcessInstanceId[0] = currentProcessInstanceId;
            }
            int columnCount = 0;
            for (String c : columns) {
                String columnValue = null;
                if (Arrays.asList(vars).contains(c)) {
                    if (valuesToWrite.get(c) != null) {
                        columnValue = (String)valuesToWrite.get(c);
                    } else {
                        String currentValue = (String)row.getAs(c);
                        if (currentValue != null) {
                            valuesToWrite.put(c, currentValue);
                            columnValue = currentValue;
                        }
                    }
                } else {
                    columnValue = (String)row.getAs(c);
                }
                newRow[columnCount++] = columnValue;
            }
            return RowFactory.create((Object[])newRow);
        }, (Encoder)RowEncoder.apply((StructType)dataset.schema()));
        if (config.isWriteStepResultsIntoFile()) {
            BpmnaiUtils.getInstance().writeDatasetToCSV((Dataset<Row>)dataset, "fill_activity_instances_history", config);
        }
        return dataset;
    }
}

