/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.bpmnai.core.runner.impl;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.viadee.bpmnai.core.processing.steps.PipelineStep;
import de.viadee.bpmnai.core.processing.steps.importing.ColumnsPreparationStep;
import de.viadee.bpmnai.core.processing.steps.importing.InitialCleanupStep;
import de.viadee.bpmnai.core.processing.steps.output.WriteToDataSinkStep;
import de.viadee.bpmnai.core.runner.SparkRunner;
import de.viadee.bpmnai.core.runner.config.SparkRunnerConfig;
import de.viadee.bpmnai.core.util.BpmnaiUtils;
import de.viadee.bpmnai.core.util.arguments.KafkaImportArguments;
import de.viadee.bpmnai.core.util.logging.BpmnaiLogger;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.functions;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategy;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaImportRunner
extends SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaImportRunner.class);
    private final Map<String, Object> kafkaConsumerConfigPI = new HashMap<String, Object>();
    private final Map<String, Object> kafkaConsumerConfigVU = new HashMap<String, Object>();
    private final Map<String, Object> kafkaConsumerConfigAI = new HashMap<String, Object>();
    private final int kafkaPollingDuration = 5000;
    private Dataset<Row> masterDataset = null;
    private List<String> receivedQueues = new ArrayList<String>();
    private List<String> emptyQueues = new ArrayList<String>();
    private CountDownLatch countDownLatch;
    private int EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = 2;

    public KafkaImportRunner() {
    }

    public KafkaImportRunner(SparkRunnerConfig config) {
        super(config);
    }

    @Override
    protected void initialize(String[] arguments) {
        KafkaImportArguments kafkaImportArguments = KafkaImportArguments.getInstance();
        JCommander jCommander = JCommander.newBuilder().addObject((Object)KafkaImportArguments.getInstance()).build();
        try {
            jCommander.parse(arguments);
        }
        catch (ParameterException e) {
            LOG.error("Parsing of parameters failed. Error message: " + e.getMessage());
            jCommander.usage();
            System.exit(1);
        }
        kafkaImportArguments.createOrUpdateSparkRunnerConfig(this.sparkRunnerConfig);
        int n = this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = this.sparkRunnerConfig.getDataLevel().equals("process") ? 2 : 3;
        if (this.sparkRunnerConfig.getSaveMode().equals((Object)SaveMode.Overwrite)) {
            FileUtils.deleteQuietly((File)new File(this.sparkRunnerConfig.getTargetFolder()));
        }
        BpmnaiLogger.getInstance().writeInfo("Starting Kafka import " + (this.sparkRunnerConfig.isBatchMode() ? "in batch mode " : "") + "from: " + this.sparkRunnerConfig.getKafkaBroker());
    }

    private synchronized void processMasterRDD(JavaRDD<String> newRDD, String queue) {
        if (newRDD.count() == 0L) {
            if (this.sparkRunnerConfig.isBatchMode()) {
                BpmnaiLogger.getInstance().writeInfo("Kafka queue '" + queue + "' returned zero entries.");
                if (!this.emptyQueues.contains(queue)) {
                    this.emptyQueues.add(queue);
                }
                if (this.emptyQueues.size() == this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE) {
                    BpmnaiLogger.getInstance().writeInfo("All Kafka queues (" + this.emptyQueues.stream().collect(Collectors.joining(",")) + ") returned zero entries once. Stopping as running in batch mode");
                    this.countDownLatch.countDown();
                }
            }
            return;
        }
        if (!this.receivedQueues.contains(queue)) {
            this.receivedQueues.add(queue);
        }
        Dataset jd = this.sparkSession.createDataset(newRDD.rdd(), Encoders.STRING());
        Dataset newDataset = this.sparkSession.read().json(jd);
        newDataset = newDataset.withColumn("source", functions.lit((Object)queue));
        this.masterDataset = this.masterDataset == null ? newDataset : BpmnaiUtils.getInstance().unionDatasets(this.masterDataset, (Dataset<Row>)newDataset);
    }

    @Override
    protected List<PipelineStep> buildDefaultPipeline() {
        ArrayList<PipelineStep> pipelineSteps = new ArrayList<PipelineStep>();
        pipelineSteps.add(new PipelineStep(new ColumnsPreparationStep(), ""));
        pipelineSteps.add(new PipelineStep(new InitialCleanupStep(), "ColumnsPreparationStep"));
        pipelineSteps.add(new PipelineStep(new WriteToDataSinkStep(), "InitialCleanupStep"));
        return pipelineSteps;
    }

    @Override
    protected Dataset<Row> loadInitialDataset() {
        long startMillis = System.currentTimeMillis();
        if (this.sparkRunnerConfig.isBatchMode()) {
            this.countDownLatch = new CountDownLatch(1);
        }
        this.kafkaConsumerConfigPI.put("bootstrap.servers", this.sparkRunnerConfig.getKafkaBroker());
        this.kafkaConsumerConfigPI.put("key.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigPI.put("value.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigPI.put("group.id", UUID.randomUUID().toString());
        this.kafkaConsumerConfigPI.put("auto.offset.reset", "earliest");
        this.kafkaConsumerConfigVU.put("bootstrap.servers", this.sparkRunnerConfig.getKafkaBroker());
        this.kafkaConsumerConfigVU.put("key.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigVU.put("value.deserializer", StringDeserializer.class);
        this.kafkaConsumerConfigVU.put("group.id", UUID.randomUUID().toString());
        this.kafkaConsumerConfigVU.put("auto.offset.reset", "earliest");
        JavaSparkContext jsc = new JavaSparkContext(this.sparkSession.sparkContext());
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Duration.apply((long)5000L));
        JavaInputDStream processInstances = KafkaUtils.createDirectStream((JavaStreamingContext)jssc, (LocationStrategy)LocationStrategies.PreferConsistent(), (ConsumerStrategy)ConsumerStrategies.Subscribe(Arrays.asList("processInstance"), this.kafkaConsumerConfigPI));
        processInstances.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, "processInstance"));
        JavaInputDStream variableUpdates = KafkaUtils.createDirectStream((JavaStreamingContext)jssc, (LocationStrategy)LocationStrategies.PreferConsistent(), (ConsumerStrategy)ConsumerStrategies.Subscribe(Arrays.asList("variableUpdate"), this.kafkaConsumerConfigVU));
        variableUpdates.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, "variableUpdate"));
        if (this.sparkRunnerConfig.getDataLevel().equals("activity")) {
            this.kafkaConsumerConfigAI.put("bootstrap.servers", this.sparkRunnerConfig.getKafkaBroker());
            this.kafkaConsumerConfigAI.put("key.deserializer", StringDeserializer.class);
            this.kafkaConsumerConfigAI.put("value.deserializer", StringDeserializer.class);
            this.kafkaConsumerConfigAI.put("group.id", UUID.randomUUID().toString());
            this.kafkaConsumerConfigAI.put("auto.offset.reset", "earliest");
            JavaInputDStream activityInstances = KafkaUtils.createDirectStream((JavaStreamingContext)jssc, (LocationStrategy)LocationStrategies.PreferConsistent(), (ConsumerStrategy)ConsumerStrategies.Subscribe(Arrays.asList("activityInstance"), this.kafkaConsumerConfigAI));
            activityInstances.map((Function & Serializable)record -> (String)record.value()).foreachRDD((VoidFunction & Serializable)stringJavaRDD -> this.processMasterRDD((JavaRDD<String>)stringJavaRDD, "activityInstance"));
        }
        jssc.start();
        if (this.sparkRunnerConfig.isBatchMode()) {
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endMillis = System.currentTimeMillis();
            BpmnaiLogger.getInstance().writeInfo("Kafka import finished (took " + (endMillis - startMillis) / 1000L + " seconds in total)");
            jssc.stop(false);
        } else {
            try {
                jssc.awaitTermination();
                long endMillis = System.currentTimeMillis();
                BpmnaiLogger.getInstance().writeInfo("Kafka import finished (took " + (endMillis - startMillis) / 1000L + " seconds in total)");
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return this.masterDataset;
    }
}

