/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.runner;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.steps.PipelineStep;
import de.viadee.ki.sparkimporter.processing.steps.importing.ColumnsPreparationStep;
import de.viadee.ki.sparkimporter.processing.steps.importing.InitialCleanupStep;
import de.viadee.ki.sparkimporter.processing.steps.output.WriteToDataSinkStep;
import de.viadee.ki.sparkimporter.runner.SparkRunner;
import de.viadee.ki.sparkimporter.util.SparkImporterKafkaImportArguments;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaImportRunner
extends SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaImportRunner.class);
    public static SparkImporterKafkaImportArguments ARGS;
    private static final String TOPIC_PROCESS_INSTANCE = "processInstance";
    private static final String TOPIC_VARIABLE_UPDATE = "variableUpdate";
    private static final String TOPIC_ACTIVITY_INSTANCE = "activityInstance";
    private final Map<String, Object> kafkaConsumerConfigPI = new HashMap<String, Object>();
    private final Map<String, Object> kafkaConsumerConfigVU = new HashMap<String, Object>();
    private final Map<String, Object> kafkaConsumerConfigAI = new HashMap<String, Object>();
    private JavaRDD<String> masterRdd = null;
    private Dataset<Row> masterDataset = null;
    private List<String> receivedQueues = new ArrayList<String>();
    private List<String> emptyQueues = new ArrayList<String>();
    private CountDownLatch countDownLatch;
    private int EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = 2;

    @Override
    protected void initialize(String[] arguments) {
        ARGS = SparkImporterKafkaImportArguments.getInstance();
        JCommander jCommander = JCommander.newBuilder().addObject(SparkImporterKafkaImportArguments.getInstance()).build();
        try {
            jCommander.parse(arguments);
        }
        catch (ParameterException e) {
            LOG.error("Parsing of parameters failed. Error message: " + e.getMessage());
            jCommander.usage();
            System.exit(1);
        }
        this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = ARGS.getDataLevel().equals("process") ? 2 : 3;
        SparkImporterVariables.setTargetFolder(ARGS.getFileDestination());
        SparkImporterVariables.setWorkingDirectory(ARGS.getWorkingDirectory());
        SparkImporterLogger.setLogDirectory(ARGS.getLogDirectory());
        SparkImporterVariables.setProcessFilterDefinitionId(ARGS.getProcessDefinitionFilterId());
        this.dataLevel = ARGS.getDataLevel();
        PreprocessingRunner.writeStepResultsIntoFile = ARGS.isWriteStepResultsToCSV();
        FileUtils.deleteQuietly((File)new File(ARGS.getFileDestination()));
        SparkImporterLogger.getInstance().writeInfo("Starting Kafka import " + (ARGS.isBatchMode() ? "in batch mode " : "") + "from: " + ARGS.getKafkaBroker());
    }

    private synchronized void processMasterRDD(JavaRDD<String> newRDD, String queue) {
        if (newRDD.count() == 0L) {
            if (ARGS.isBatchMode()) {
                SparkImporterLogger.getInstance().writeInfo("Kafka queue '" + queue + "' returned zero entries.");
                if (!this.emptyQueues.contains(queue)) {
                    this.emptyQueues.add(queue);
                }
                if (this.emptyQueues.size() == this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE) {
                    SparkImporterLogger.getInstance().writeInfo("All Kafka queues (" + this.emptyQueues.stream().collect(Collectors.joining(",")) + ") returned zero entries once. Stopping as running in batch mode");
                    this.countDownLatch.countDown();
                }
            }
            return;
        }
        if (!this.receivedQueues.contains(queue)) {
            this.receivedQueues.add(queue);
        }
        if (this.masterDataset == null) {
            if (this.receivedQueues.size() == this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE) {
                this.masterRdd = this.masterRdd.union(newRDD);
                Dataset jsonDataset = this.sparkSession.createDataset(this.masterRdd.rdd(), Encoders.STRING());
                this.masterDataset = this.sparkSession.read().json(jsonDataset);
            } else {
                this.masterRdd = this.masterRdd == null ? newRDD : this.masterRdd.union(newRDD);
            }
        } else {
            Dataset unionPrepDataset;
            Dataset jsonDataset = this.sparkSession.createDataset(newRDD.rdd(), Encoders.STRING());
            Dataset rowDataset = this.sparkSession.read().json(jsonDataset);
            this.masterDataset = unionPrepDataset = this.sparkSession.createDataFrame(rowDataset.rdd(), this.masterDataset.schema());
        }
    }

    @Override
    protected List<PipelineStep> buildDefaultPipeline() {
        ArrayList<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();
        pipelineSteps.add(new PipelineStep(new ColumnsPreparationStep(), ""));
        pipelineSteps.add(new PipelineStep(new InitialCleanupStep(), "ColumnsPreparationStep"));
        pipelineSteps.add(new PipelineStep(new WriteToDataSinkStep(), "InitialCleanupStep"));
        return pipelineSteps;
    }

    @Override
    protected Dataset<Row> loadInitialDataset() {
        long startMillis = System.currentTimeMillis();
        this.masterRdd = this.sparkSession.emptyDataset(Encoders.STRING()).javaRDD();
        if (ARGS.isBatchMode()) {
            this.countDownLatch = new CountDownLatch(1);
        }
        int duration = 5000;
        this.kafkaConsumerConfigPI.put("bootstrap.servers", ARGS.getKafkaBroker());
        this.kafkaConsumerConfigPI.put("key.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigPI.put("value.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigPI.put("group.id", UUID.randomUUID().toString());
        this.kafkaConsumerConfigPI.put("auto.offset.reset", "earliest");
        this.kafkaConsumerConfigVU.put("bootstrap.servers", ARGS.getKafkaBroker());
        this.kafkaConsumerConfigVU.put("key.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigVU.put("value.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigVU.put("group.id", UUID.randomUUID().toString());
        this.kafkaConsumerConfigVU.put("auto.offset.reset", "earliest");
        JavaSparkContext jsc = new JavaSparkContext(this.sparkSession.sparkContext());
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Duration.apply((long)duration));
        JavaInputDStream processInstances = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_PROCESS_INSTANCE), this.kafkaConsumerConfigPI));
        processInstances.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, TOPIC_PROCESS_INSTANCE));
        JavaInputDStream variableUpdates = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_VARIABLE_UPDATE), this.kafkaConsumerConfigVU));
        variableUpdates.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, TOPIC_VARIABLE_UPDATE));
        if (ARGS.getDataLevel().equals("activity")) {
            this.kafkaConsumerConfigAI.put("bootstrap.servers", ARGS.getKafkaBroker());
            this.kafkaConsumerConfigAI.put("key.deserializer", StringDeserializer.class);
            this.kafkaConsumerConfigAI.put("value.deserializer", StringDeserializer.class);
            this.kafkaConsumerConfigAI.put("group.id", UUID.randomUUID().toString());
            this.kafkaConsumerConfigAI.put("auto.offset.reset", "earliest");
            JavaInputDStream activityInstances = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_ACTIVITY_INSTANCE), this.kafkaConsumerConfigAI));
            activityInstances.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, TOPIC_ACTIVITY_INSTANCE));
        }
        jssc.start();
        if (ARGS.isBatchMode()) {
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endMillis = System.currentTimeMillis();
            SparkImporterLogger.getInstance().writeInfo("Kafka import finished (took " + (endMillis - startMillis) / 1000L + " seconds in total)");
            jssc.stop(false);
        } else {
            try {
                jssc.awaitTermination();
                long endMillis = System.currentTimeMillis();
                SparkImporterLogger.getInstance().writeInfo("Kafka import finished (took " + (endMillis - startMillis) / 1000L + " seconds in total)");
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return this.masterDataset;
    }
}

