/*
 * Decompiled with CFR 0.152.
 */
package de.viadee.ki.sparkimporter.util;

import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkImporterUtils {
    private static SparkImporterUtils instance;

    private SparkImporterUtils() {
    }

    public static synchronized SparkImporterUtils getInstance() {
        if (instance == null) {
            instance = new SparkImporterUtils();
        }
        return instance;
    }

    public String md5CecksumOfObject(Object obj) throws IOException, NoSuchAlgorithmException {
        if (obj == null) {
            return "";
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(obj);
        oos.close();
        MessageDigest m = MessageDigest.getInstance("MD5");
        m.update(baos.toByteArray());
        Base64 codec = new Base64();
        byte[] encoded = codec.encode(m.digest());
        return DigestUtils.md5Hex((String)new String(encoded)).toUpperCase();
    }

    public void writeDatasetToParquet(Dataset<Row> dataSet, String subDirectory) {
        String targetFolder = SparkImporterVariables.getTargetFolder() + "/";
        targetFolder = subDirectory.equals("result") ? targetFolder + "result" : targetFolder + "intermediate/" + String.format("%02d", PreprocessingRunner.getNextCounter()) + "_" + subDirectory;
        dataSet.write().mode(SparkImporterVariables.getSaveMode()).save(targetFolder + "/parquet");
        if (SparkImporterVariables.getOutputFormat().equals("csv") && subDirectory.equals("result")) {
            SparkSession sparkSession = SparkSession.builder().getOrCreate();
            Dataset parquetData = sparkSession.read().load(targetFolder + "/parquet");
            parquetData.coalesce(1).write().option("header", "true").option("delimiter", "|").option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSS").option("ignoreLeadingWhiteSpace", "false").option("ignoreTrailingWhiteSpace", "false").mode(SparkImporterVariables.getSaveMode()).csv(targetFolder + "/csv");
            if (targetFolder.startsWith("hdfs")) {
                Path path = new Path(targetFolder);
                Configuration conf = new Configuration();
                FileSystem fileSystem = null;
                try {
                    fileSystem = FileSystem.get((Configuration)conf);
                    if (!fileSystem.isDirectory(path)) {
                        throw new IllegalStateException("Cannot find result folder!");
                    }
                    RemoteIterator files = fileSystem.listFiles(path, false);
                    while (files.hasNext()) {
                        LocatedFileStatus file = (LocatedFileStatus)files.next();
                        if (!file.isFile() || !file.getPath().getName().contains("part-")) continue;
                        FileUtil.copy((FileSystem)fileSystem, (Path)file.getPath(), (FileSystem)fileSystem, (Path)new Path(targetFolder + "/result.csv"), (boolean)true, (Configuration)conf);
                    }
                    FileUtil.fullyDeleteContents((File)new File(targetFolder + "/csv"));
                    FileUtil.copy((FileSystem)fileSystem, (Path)new Path(targetFolder + "/result.csv"), (FileSystem)fileSystem, (Path)new Path(targetFolder + "/csv/result.csv"), (boolean)true, (Configuration)conf);
                }
                catch (IOException e) {
                    SparkImporterLogger.getInstance().writeError("An error occurred during the renaming of the result file in HDFS. Exception: " + e.getMessage());
                }
            } else {
                File dir = new File(targetFolder + "/csv");
                if (!dir.isDirectory()) {
                    throw new IllegalStateException("Cannot find result folder!");
                }
                File targetFile = new File(dir + "/../result.csv");
                for (File file : dir.listFiles()) {
                    if (!file.getName().startsWith("part-")) continue;
                    try {
                        Files.move(file.toPath(), targetFile.toPath(), new CopyOption[0]);
                    }
                    catch (IOException e) {
                        SparkImporterLogger.getInstance().writeError("An error occurred during the renaming of the result file. Exception: " + e.getMessage());
                    }
                }
                try {
                    FileUtils.cleanDirectory((File)dir);
                    Files.move(targetFile.toPath(), new File(dir + "/result.csv").toPath(), new CopyOption[0]);
                }
                catch (IOException e) {
                    SparkImporterLogger.getInstance().writeError("An error occurred during the renaming of the result file. Exception: " + e.getMessage());
                }
            }
        }
    }

    public void writeDatasetToCSV(Dataset<Row> dataSet, String subDirectory) {
        this.writeDatasetToCSV(dataSet, subDirectory, "|");
    }

    private void writeDatasetToCSV(Dataset<Row> dataSet, String subDirectory, String delimiter) {
        boolean aggreateCSVToOneFile = true;
        if (aggreateCSVToOneFile) {
            dataSet = dataSet.coalesce(1);
        }
        String path = SparkImporterVariables.getTargetFolder() + "/";
        path = subDirectory.equals("result") ? path + "result" : path + "intermediate/" + String.format("%02d", PreprocessingRunner.getNextCounter()) + "_" + subDirectory;
        dataSet.write().option("header", "true").option("delimiter", delimiter).option("ignoreLeadingWhiteSpace", "false").option("ignoreTrailingWhiteSpace", "false").mode(SparkImporterVariables.getSaveMode()).csv(path);
    }

    public Dataset<Row> removeDuplicatedColumns(Dataset<Row> dataset) {
        String[] columns = dataset.columns();
        HashMap<String, Column> uniqueColumnNameMapping = new HashMap<String, Column>();
        Pattern p = Pattern.compile("(\\w+_)\\d*");
        for (String col : columns) {
            Matcher m = p.matcher(col);
            if (!m.matches() || uniqueColumnNameMapping.keySet().contains(m.group(1))) continue;
            uniqueColumnNameMapping.put(m.group(1), new Column(col));
        }
        Seq selectionColumns = ((Iterator)JavaConverters.asScalaIteratorConverter(uniqueColumnNameMapping.values().iterator()).asScala()).toSeq();
        if (columns.length != uniqueColumnNameMapping.size()) {
            Dataset newDataset = dataset.select(selectionColumns).toDF();
            HashMap<String, String> swappedUniqueColumnNameMapping = new HashMap<String, String>();
            for (String key : uniqueColumnNameMapping.keySet()) {
                swappedUniqueColumnNameMapping.put(((Column)uniqueColumnNameMapping.get(key)).toString(), key);
            }
            for (String column : newDataset.columns()) {
                newDataset = newDataset.withColumnRenamed(column, (String)swappedUniqueColumnNameMapping.get(column));
            }
            return newDataset;
        }
        return dataset;
    }

    public Dataset<Row> removeEmptyLinesAfterImport(Dataset<Row> dataset) {
        return dataset.filter("proc_inst_id_ <> 'null'").filter("proc_inst_id_ <> ''");
    }

    public <T> Seq<T> asSeq(List<T> values) {
        return JavaConversions.asScalaBuffer(values);
    }
}

