/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.runner;

import de.viadee.ki.sparkimporter.configuration.Configuration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PipelineStepConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.ki.sparkimporter.configuration.preprocessing.Step;
import de.viadee.ki.sparkimporter.configuration.util.ConfigurationUtils;
import de.viadee.ki.sparkimporter.exceptions.FaultyConfigurationException;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.aggregation.AllButEmptyStringAggregationFunction;
import de.viadee.ki.sparkimporter.processing.aggregation.ProcessStatesAggregationFunction;
import de.viadee.ki.sparkimporter.processing.steps.PipelineManager;
import de.viadee.ki.sparkimporter.processing.steps.PipelineStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.AddVariableColumnsStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.CreateColumnsFromJsonStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.DetermineProcessVariablesStep;
import de.viadee.ki.sparkimporter.processing.steps.dataprocessing.ReduceColumnsStep;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.primitives.Longs;

public abstract class SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private PipelineManager pipelineManager = null;
    protected SparkSession sparkSession = null;
    private Dataset<Row> dataset;
    protected String dataLevel = "process";
    private List<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();

    protected abstract void initialize(String[] var1);

    protected abstract List<PipelineStep> buildDefaultPipeline();

    protected List<PipelineStep> buildMinimalPipeline() {
        ArrayList<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();
        pipelineSteps.add(new PipelineStep(new ReduceColumnsStep(), ""));
        pipelineSteps.add(new PipelineStep(new DetermineProcessVariablesStep(), "ReduceColumnsStep"));
        pipelineSteps.add(new PipelineStep(new AddVariableColumnsStep(), "DetermineProcessVariablesStep"));
        pipelineSteps.add(new PipelineStep(new CreateColumnsFromJsonStep(), "AddVariableColumnsStep"));
        return pipelineSteps;
    }

    protected abstract Dataset<Row> loadInitialDataset();

    private void checkConfig() {
        if (ConfigurationUtils.getInstance().getConfiguration(true) == null || ConfigurationUtils.getInstance().getConfiguration(true).isEmpty()) {
            if (!SparkImporterVariables.getRunningMode().equals((Object)RUNNING_MODE.KAFKA_IMPORT)) {
                PreprocessingRunner.minimalPipelineToBeBuild = true;
            }
            PreprocessingRunner.initialConfigToBeWritten = true;
            ConfigurationUtils.getInstance().createEmptyConfig();
        } else {
            SparkImporterLogger.getInstance().writeInfo("Configuration file found: " + SparkImporterVariables.getWorkingDirectory() + "/" + ConfigurationUtils.getInstance().getConfigurationFileName());
        }
    }

    private void writeConfig() {
        if (PreprocessingRunner.initialConfigToBeWritten) {
            ConfigurationUtils.getInstance().writeConfigurationToFile();
        }
    }

    private void registerUDFs() {
        this.sparkSession.udf().register("AllButEmptyString", (UserDefinedAggregateFunction)new AllButEmptyStringAggregationFunction());
        this.sparkSession.udf().register("ProcessState", (UserDefinedAggregateFunction)new ProcessStatesAggregationFunction());
        this.sparkSession.udf().register("isALong", (UDF1 & Serializable)o -> {
            if (o instanceof Long) {
                return true;
            }
            if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                return true;
            }
            return false;
        }, DataTypes.BooleanType);
        this.sparkSession.udf().register("timestampStringToLong", (UDF1 & Serializable)o -> {
            if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                return Longs.tryParse((String)((String)o)) / 1000L;
            }
            return null;
        }, DataTypes.LongType);
    }

    public void run(String[] arguments) throws FaultyConfigurationException {
        this.sparkSession = SparkSession.builder().getOrCreate();
        LOG.info("Spark application '" + this.sparkSession.sparkContext().appName() + "' (ID: " + this.sparkSession.sparkContext().applicationId() + ") started.");
        this.sparkSession.sparkContext().addSparkListener((SparkListenerInterface)new SparkListener(){

            public void onJobEnd(SparkListenerJobEnd jobEnd) {
                super.onJobEnd(jobEnd);
                LOG.info("... job " + jobEnd.jobId() + " finished.");
            }

            public void onJobStart(SparkListenerJobStart jobStart) {
                super.onJobStart(jobStart);
                LOG.info("Spark job " + jobStart.jobId() + " started (has " + jobStart.stageIds().size() + " " + (jobStart.stageIds().size() == 1 ? "stage" : "stages") + ") ...");
            }

            public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
                super.onApplicationEnd(applicationEnd);
                LOG.info("Spark application finished.");
            }
        });
        this.registerUDFs();
        this.initialize(arguments);
        this.checkConfig();
        this.configurePipelineSteps();
        this.dataset = this.loadInitialDataset();
        if (SparkImporterVariables.getProcessFilterDefinitionId() != null) {
            this.dataset = this.dataset.filter(this.dataset.col("proc_def_id_").equalTo((Object)SparkImporterVariables.getProcessFilterDefinitionId()));
        }
        PreprocessingRunner preprocessingRunner = new PreprocessingRunner();
        for (PipelineStep ps : this.pipelineManager.getOrderedPipeline()) {
            preprocessingRunner.addPreprocessorStep(ps);
        }
        long startMillis = System.currentTimeMillis();
        preprocessingRunner.run(this.dataset, this.dataLevel);
        long endMillis = System.currentTimeMillis();
        String logMessage = "Job ran for " + (endMillis - startMillis) / 1000L + " seconds in total";
        LOG.info(logMessage);
        SparkImporterLogger.getInstance().writeInfo(logMessage);
        if (PreprocessingRunner.minimalPipelineToBeBuild) {
            logMessage = "Filling the minimal configuration pipeline with the applications default pipeline...";
            LOG.info(logMessage);
            SparkImporterLogger.getInstance().writeInfo(logMessage);
            logMessage = "Execute again to process data with under the newly created configuration.";
            LOG.info(logMessage);
            SparkImporterLogger.getInstance().writeInfo(logMessage);
            this.overwritePipelineSteps();
        }
        this.sparkSession.close();
        this.writeConfig();
    }

    public void overwritePipelineSteps() {
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (PreprocessingRunner.initialConfigToBeWritten) {
            this.pipelineSteps = this.buildDefaultPipeline();
            PreprocessingConfiguration preprocessingConfiguration = configuration.getPreprocessingConfiguration();
            PipelineStepConfiguration pipelineStepConfiguration = preprocessingConfiguration.getPipelineStepConfiguration();
            ArrayList<Step> configSteps = new ArrayList<Step>();
            for (PipelineStep ps : this.pipelineSteps) {
                Step s = new Step();
                s.setClassName(ps.getClassName());
                s.setDependsOn(ps.getDependsOn());
                s.setId(ps.getId());
                s.setParameters(ps.getStepParameters());
                s.setComment("");
                s.setActive(true);
                configSteps.add(s);
            }
            pipelineStepConfiguration.setSteps(configSteps);
        }
    }

    public void configurePipelineSteps() throws FaultyConfigurationException {
        PipelineStepConfiguration pipelineStepConfiguration;
        PreprocessingConfiguration preprocessingConfiguration;
        List<Step> steps = null;
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration();
        if (PreprocessingRunner.initialConfigToBeWritten) {
            this.pipelineSteps = !SparkImporterVariables.getRunningMode().equals((Object)RUNNING_MODE.KAFKA_IMPORT) ? this.buildMinimalPipeline() : this.buildDefaultPipeline();
            PreprocessingConfiguration preprocessingConfiguration2 = configuration.getPreprocessingConfiguration();
            PipelineStepConfiguration pipelineStepConfiguration2 = preprocessingConfiguration2.getPipelineStepConfiguration();
            ArrayList<Step> configSteps = new ArrayList<Step>();
            for (PipelineStep ps : this.pipelineSteps) {
                Step s = new Step();
                s.setClassName(ps.getClassName());
                s.setDependsOn(ps.getDependsOn());
                s.setId(ps.getId());
                s.setParameters(ps.getStepParameters());
                s.setComment("");
                s.setActive(true);
                configSteps.add(s);
            }
            pipelineStepConfiguration2.setSteps(configSteps);
        } else if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null && (pipelineStepConfiguration = preprocessingConfiguration.getPipelineStepConfiguration()) != null && (steps = pipelineStepConfiguration.getSteps()) != null) {
            for (Step cs : steps) {
                if (!cs.getActive().booleanValue()) continue;
                this.pipelineSteps.add(new PipelineStep(cs));
            }
        }
        this.pipelineManager = new PipelineManager(this.pipelineSteps);
    }

    public static enum RUNNING_MODE {
        CSV_IMPORT_AND_PROCESSING("csv"),
        KAFKA_IMPORT("kafka_import"),
        KAFKA_PROCESSING("kafka_process");

        private String runnerMode;

        private RUNNING_MODE(String runnerMode) {
            this.runnerMode = runnerMode;
        }

        public String getModeString() {
            return this.runnerMode;
        }
    }
}

