/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.runner;

import de.viadee.bpmnai.core.configuration.Configuration;
import de.viadee.bpmnai.core.configuration.preprocessing.PipelineStepConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.PreprocessingConfiguration;
import de.viadee.bpmnai.core.configuration.preprocessing.Step;
import de.viadee.bpmnai.core.configuration.util.ConfigurationUtils;
import de.viadee.bpmnai.core.exceptions.FaultyConfigurationException;
import de.viadee.bpmnai.core.listener.SparkRunnerListener;
import de.viadee.bpmnai.core.processing.PreprocessingRunner;
import de.viadee.bpmnai.core.processing.aggregation.AllButEmptyStringAggregationFunction;
import de.viadee.bpmnai.core.processing.aggregation.ProcessStatesAggregationFunction;
import de.viadee.bpmnai.core.processing.steps.PipelineManager;
import de.viadee.bpmnai.core.processing.steps.PipelineStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.AddVariableColumnsStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.DetermineProcessVariablesStep;
import de.viadee.bpmnai.core.processing.steps.dataprocessing.ReduceColumnsStep;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.helper.SparkBroadcastHelper;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.primitives.Longs;

public abstract class SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
    private PipelineManager pipelineManager = null;
    protected SparkSession sparkSession = null;
    private Dataset<Row> dataset;
    private List<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();
    protected SparkRunnerConfig sparkRunnerConfig = new SparkRunnerConfig();

    protected SparkRunner() {
    }

    protected SparkRunner(SparkRunnerConfig config) {
        this.sparkRunnerConfig = config;
    }

    protected abstract void initialize(String[] var1);

    protected abstract List<PipelineStep> buildDefaultPipeline();

    protected List<PipelineStep> buildMinimalPipeline() {
        ArrayList<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();
        pipelineSteps.add(new PipelineStep(new ReduceColumnsStep(), ""));
        pipelineSteps.add(new PipelineStep(new DetermineProcessVariablesStep(), "ReduceColumnsStep"));
        pipelineSteps.add(new PipelineStep(new AddVariableColumnsStep(), "DetermineProcessVariablesStep"));
        return pipelineSteps;
    }

    protected abstract Dataset<Row> loadInitialDataset();

    private void checkConfig() {
        if (ConfigurationUtils.getInstance().getConfiguration(true, this.sparkRunnerConfig) == null || ConfigurationUtils.getInstance().getConfiguration(true, this.sparkRunnerConfig).isEmpty()) {
            if (!this.sparkRunnerConfig.getRunningMode().equals((Object)RUNNING_MODE.KAFKA_IMPORT)) {
                this.sparkRunnerConfig.setMinimalPipelineToBeBuild(true);
            }
            this.sparkRunnerConfig.setInitialConfigToBeWritten(true);
            ConfigurationUtils.getInstance().createEmptyConfig(this.sparkRunnerConfig);
        } else {
            BpmnaiLogger.getInstance().writeInfo("Configuration file found: " + this.sparkRunnerConfig.getWorkingDirectory() + "/" + ConfigurationUtils.getInstance().getConfigurationFileName(this.sparkRunnerConfig));
            ConfigurationUtils.getInstance().validateConfigurationFileVsSparkRunnerConfig(this.sparkRunnerConfig);
        }
    }

    private void writeConfig() {
        if (this.sparkRunnerConfig.isInitialConfigToBeWritten()) {
            Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(this.sparkRunnerConfig);
            configuration.getPreprocessingConfiguration().setDataLevel(this.sparkRunnerConfig.getDataLevel());
            ConfigurationUtils.getInstance().writeConfigurationToFile(this.sparkRunnerConfig);
        }
    }

    private void registerUDFs() {
        this.sparkSession.udf().register("AllButEmptyString", (UserDefinedAggregateFunction)new AllButEmptyStringAggregationFunction());
        this.sparkSession.udf().register("ProcessState", (UserDefinedAggregateFunction)new ProcessStatesAggregationFunction());
        this.sparkSession.udf().register("isALong", (UDF1 & Serializable)o -> {
            if (o instanceof Long) {
                return true;
            }
            if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                return true;
            }
            return false;
        }, DataTypes.BooleanType);
        this.sparkSession.udf().register("timestampStringToLong", (UDF1 & Serializable)o -> {
            if (o instanceof String && Longs.tryParse((String)((String)o)) != null) {
                return Longs.tryParse((String)((String)o)) / 1000L;
            }
            return null;
        }, DataTypes.LongType);
        this.sparkSession.udf().register("activityBeforeTimestamp", (UDF2 & Serializable)(s, s2) -> {
            Map activities = (Map)SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_INSTANCE_TIMESTAMP_MAP);
            if (activities == null || activities.isEmpty()) {
                return "Error: Broadcast not found";
            }
            if (activities.containsKey(s)) {
                Timestamp tsAct = new Timestamp(Long.parseLong((String)activities.get(s)));
                if (s2 == null || s2.isEmpty()) {
                    return "FALSE";
                }
                Timestamp tsObject = new Timestamp(Long.parseLong(s2));
                if (tsObject.after(tsAct)) {
                    return "FALSE";
                }
                return "TRUE";
            }
            return "FALSE";
        }, DataTypes.StringType);
    }

    public void run(String[] arguments) throws FaultyConfigurationException {
        this.run(arguments, null);
    }

    public void run(String[] arguments, final SparkRunnerListener sparkRunnerListener) throws FaultyConfigurationException {
        this.sparkSession = SparkSession.builder().getOrCreate();
        LOG.info("Spark application '" + this.sparkSession.sparkContext().appName() + "' (ID: " + this.sparkSession.sparkContext().applicationId() + ") started.");
        this.sparkSession.sparkContext().addSparkListener((SparkListenerInterface)new SparkListener(){

            public void onJobEnd(SparkListenerJobEnd jobEnd) {
                super.onJobEnd(jobEnd);
                LOG.info("... job " + jobEnd.jobId() + " finished.");
            }

            public void onJobStart(SparkListenerJobStart jobStart) {
                super.onJobStart(jobStart);
                LOG.info("Spark job " + jobStart.jobId() + " started (has " + jobStart.stageIds().size() + " " + (jobStart.stageIds().size() == 1 ? "stage" : "stages") + ") ...");
            }

            public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
                super.onApplicationEnd(applicationEnd);
                LOG.info("Spark application finished.");
            }
        });
        final AtomicInteger tasksTotal = new AtomicInteger();
        final AtomicInteger tasksDone = new AtomicInteger();
        SparkListener sparkListener = new SparkListener(){

            public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
                super.onStageSubmitted(stageSubmitted);
                tasksTotal.getAndAdd(stageSubmitted.stageInfo().numTasks());
                sparkRunnerListener.onProgressUpdate(null, tasksDone.get(), tasksTotal.get());
            }

            public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
                super.onStageCompleted(stageCompleted);
                sparkRunnerListener.onProgressUpdate(null, tasksDone.get(), tasksTotal.get());
                tasksDone.getAndAdd(stageCompleted.stageInfo().numTasks());
            }
        };
        if (sparkRunnerListener != null) {
            this.sparkSession.sparkContext().addSparkListener((SparkListenerInterface)sparkListener);
        }
        this.registerUDFs();
        this.initialize(arguments);
        this.checkConfig();
        this.configurePipelineSteps();
        this.dataset = this.loadInitialDataset();
        if (this.sparkRunnerConfig.getProcessFilterDefinitionId() != null) {
            this.dataset = this.dataset.filter(this.dataset.col("proc_def_id_").equalTo((Object)this.sparkRunnerConfig.getProcessFilterDefinitionId()));
        }
        PreprocessingRunner preprocessingRunner = new PreprocessingRunner();
        for (PipelineStep ps : this.pipelineManager.getOrderedPipeline()) {
            preprocessingRunner.addPreprocessorStep(ps);
        }
        long startMillis = System.currentTimeMillis();
        preprocessingRunner.run(this.dataset, this.sparkRunnerConfig);
        long endMillis = System.currentTimeMillis();
        String logMessage = "Job ran for " + (endMillis - startMillis) / 1000L + " seconds in total";
        LOG.info(logMessage);
        BpmnaiLogger.getInstance().writeInfo(logMessage);
        if (this.sparkRunnerConfig.isMinimalPipelineToBeBuild()) {
            logMessage = "Filling the minimal configuration pipeline with the applications default pipeline...";
            LOG.info(logMessage);
            BpmnaiLogger.getInstance().writeInfo(logMessage);
            logMessage = "Execute again to process data with under the newly created configuration.";
            LOG.info(logMessage);
            BpmnaiLogger.getInstance().writeInfo(logMessage);
            this.overwritePipelineSteps();
        }
        if (this.sparkRunnerConfig.isCloseSparkSessionAfterRun()) {
            this.sparkSession.close();
        }
        this.writeConfig();
        if (sparkRunnerListener != null) {
            sparkRunnerListener.onFinished(true);
            this.sparkSession.sparkContext().removeSparkListener((SparkListenerInterface)sparkListener);
        }
    }

    public void overwritePipelineSteps() {
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(this.sparkRunnerConfig);
        if (this.sparkRunnerConfig.isInitialConfigToBeWritten()) {
            this.pipelineSteps = this.buildDefaultPipeline();
            PreprocessingConfiguration preprocessingConfiguration = configuration.getPreprocessingConfiguration();
            PipelineStepConfiguration pipelineStepConfiguration = preprocessingConfiguration.getPipelineStepConfiguration();
            ArrayList<Step> configSteps = new ArrayList<Step>();
            for (PipelineStep ps : this.pipelineSteps) {
                Step s = new Step();
                s.setClassName(ps.getClassName());
                s.setDependsOn(ps.getDependsOn());
                s.setId(ps.getId());
                s.setParameters(ps.getStepParameters());
                s.setComment("");
                s.setActive(true);
                configSteps.add(s);
            }
            pipelineStepConfiguration.setSteps(configSteps);
        }
    }

    public void configurePipelineSteps() throws FaultyConfigurationException {
        PipelineStepConfiguration pipelineStepConfiguration;
        PreprocessingConfiguration preprocessingConfiguration;
        List<Step> steps = null;
        Configuration configuration = ConfigurationUtils.getInstance().getConfiguration(this.sparkRunnerConfig);
        if (this.sparkRunnerConfig.isInitialConfigToBeWritten()) {
            this.pipelineSteps = !this.sparkRunnerConfig.getRunningMode().equals((Object)RUNNING_MODE.KAFKA_IMPORT) ? this.buildMinimalPipeline() : this.buildDefaultPipeline();
            PreprocessingConfiguration preprocessingConfiguration2 = configuration.getPreprocessingConfiguration();
            PipelineStepConfiguration pipelineStepConfiguration2 = preprocessingConfiguration2.getPipelineStepConfiguration();
            ArrayList<Step> configSteps = new ArrayList<Step>();
            for (PipelineStep ps : this.pipelineSteps) {
                Step s = new Step();
                s.setClassName(ps.getClassName());
                s.setDependsOn(ps.getDependsOn());
                s.setId(ps.getId());
                s.setParameters(ps.getStepParameters());
                s.setComment("");
                s.setActive(true);
                configSteps.add(s);
            }
            pipelineStepConfiguration2.setSteps(configSteps);
        } else if (configuration != null && (preprocessingConfiguration = configuration.getPreprocessingConfiguration()) != null && (pipelineStepConfiguration = preprocessingConfiguration.getPipelineStepConfiguration()) != null && (steps = pipelineStepConfiguration.getSteps()) != null) {
            for (Step cs : steps) {
                if (!cs.getActive().booleanValue()) continue;
                this.pipelineSteps.add(new PipelineStep(cs));
            }
        }
        this.pipelineManager = new PipelineManager(this.pipelineSteps);
    }

    public static enum RUNNING_MODE {
        CSV_IMPORT_AND_PROCESSING("csv"),
        KAFKA_IMPORT("kafka_import"),
        KAFKA_PROCESSING("kafka_process");

        private String runnerMode;

        private RUNNING_MODE(String runnerMode) {
            this.runnerMode = runnerMode;
        }

        public String getModeString() {
            return this.runnerMode;
        }
    }
}

