/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.runner;

import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.modelprediction.ModelPredictionConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PipelineStepConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.Step;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.exceptions.FaultyConfigurationException;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.aggregation.AllButEmptyStringAggregationFunction;
import de.viadee.ki.sparkimporter.processing.aggregation.ProcessStatesAggregationFunction;
import de.viadee.ki.sparkimporter.processing.steps.PipelineManager;
import de.viadee.ki.sparkimporter.processing.steps.PipelineStep;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.primitives.Longs;

public abstract class SparkPredictionServiceRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPredictionServiceRunner.class);
    private PipelineManager pipelineManager = null;
    protected SparkSession sparkSession = null;
    private Dataset<Row> dataset;
    protected String dataLevel = "process";
    private List<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();

    protected abstract void initialize();

    protected abstract List<PipelineStep> buildDefaultPipeline();

    private void checkConfig() {
        if (ConfigurationUtils.getInstance().getConfiguration(true) == null) {
            PreprocessingRunner.initialConfigToBeWritten = true;
            ConfigurationUtils.getInstance().createEmptyConfig();
        }
    }

    private void writeConfig() {
        if (PreprocessingRunner.initialConfigToBeWritten) {
            ConfigurationUtils.getInstance().writeConfigurationToFile();
        }
    }

    protected void registerUDFs() {
        this.sparkSession.udf().register("AllButEmptyString", (UserDefinedAggregateFunction)new AllButEmptyStringAggregationFunction());
        this.sparkSession.udf().register("ProcessState", (UserDefinedAggregateFunction)new ProcessStatesAggregationFunction());
        this.sparkSession.udf().register("isALong", (UDF1)new UDF1<Object, Boolean>(){

            public Boolean call(Object o) throws Exception {
                if (o instanceof Long) {
                    return true;
                }
                if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                    return true;
                }
                return false;
            }
        }, DataTypes.BooleanType);
        this.sparkSession.udf().register("timestampStringToLong", (UDF1)new UDF1<Object, Long>(){

            public Long call(Object o) throws Exception {
                if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                    return Longs.tryParse((String)((String)o)) / 1000L;
                }
                return null;
            }
        }, DataTypes.LongType);
    }

    public void setup() throws FaultyConfigurationException {
        this.sparkSession = SparkSession.builder().getOrCreate();
        this.initialize();
        this.registerUDFs();
        this.checkConfig();
        this.configurePipelineSteps();
    }

    public Dataset<Row> run(Dataset dataset) {
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        List<String> predictionVars = configuration.getModelPredictionConfiguration().getPredictionVariables();
        ArrayList<Column> usedColumns = new ArrayList<Column>();
        for (String string : predictionVars) {
            usedColumns.add(new Column(string));
        }
        dataset = dataset.select(SparkImporterUtils.getInstance().asSeq(usedColumns));
        PreprocessingRunner preprocessingRunner = new PreprocessingRunner();
        for (PipelineStep ps : this.pipelineManager.getOrderedPipeline()) {
            preprocessingRunner.addPreprocessorStep(ps);
        }
        Dataset<Row> dataset2 = preprocessingRunner.run((Dataset<Row>)dataset, this.dataLevel);
        this.writeConfig();
        return dataset2;
    }

    public void configurePipelineSteps() throws FaultyConfigurationException {
        PipelineStepConfiguration pipelineStepConfiguration;
        ModelPredictionConfiguration modelPredictionConfiguration;
        List<Step> steps = null;
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (PreprocessingRunner.initialConfigToBeWritten) {
            this.pipelineSteps = this.buildDefaultPipeline();
            ModelPredictionConfiguration modelPredictionConfiguration2 = configuration.getModelPredictionConfiguration();
            PipelineStepConfiguration pipelineStepConfiguration2 = modelPredictionConfiguration2.getPipelineStepConfiguration();
            ArrayList<Step> configSteps = new ArrayList<Step>();
            for (PipelineStep ps : this.pipelineSteps) {
                Step s = new Step();
                s.setClassName(ps.getClassName());
                s.setDependsOn(ps.getDependsOn());
                s.setId(ps.getId());
                s.setParameters(ps.getStepParameters());
                s.setComment("");
                s.setActive(true);
                configSteps.add(s);
            }
            pipelineStepConfiguration2.setSteps(configSteps);
        } else if (configuration != null && (modelPredictionConfiguration = configuration.getModelPredictionConfiguration()) != null && (pipelineStepConfiguration = modelPredictionConfiguration.getPipelineStepConfiguration()) != null && (steps = pipelineStepConfiguration.getSteps()) != null) {
            for (Step cs : steps) {
                this.pipelineSteps.add(new PipelineStep(cs));
            }
        }
        this.pipelineManager = new PipelineManager(this.pipelineSteps);
    }
}

