package com.datastax.data.prepare.spark.dataset;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.datastax.data.prepare.util.Consts;
import com.datastax.insight.annonation.InsightComponent;
import com.datastax.insight.annonation.InsightComponentArg;
import com.datastax.insight.core.driver.SparkContextBuilder;
import com.datastax.insight.spec.Operator;
import com.google.common.base.Strings;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/data/prepare/spark/dataset/BasicOperator.class */
public class BasicOperator implements Operator {
    private static final Logger logger = LoggerFactory.getLogger(BasicOperator.class);

    @InsightComponent(name = "generateID", type = "com.datastax.insight.dataprprocess.generateID", description = "ID生成", order = 500801)
    public static <T> Dataset<T> generateID(@InsightComponentArg(externalInput = true, name = "data", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "IDName", description = "ID属性名称", request = true) String str, @InsightComponentArg(name = "IDType", description = "ID类型", request = true, items = "UUID32;Increment", defaultValue = "UUID32") String str2) {
        if (Strings.isNullOrEmpty(str) || dataset == null) {
            logger.info("IDName is empty");
            return dataset;
        }
        if (Consts.INCREMENT.equals(str2)) {
            return dataset.withColumn(str, functions.lit(functions.monotonically_increasing_id()));
        }
        com.datastax.data.prepare.util.d.a(SparkContextBuilder.getSession().udf());
        return dataset.withColumn(str, functions.callUDF("uuid", new Column[]{dataset.col(dataset.columns()[0])}));
    }

    @InsightComponent(name = Consts.RENAME, type = "com.datastax.insight.dataprprocess.rename", description = Consts.RENAME, order = 500107)
    public static <T> Dataset<T> rename(@InsightComponentArg(externalInput = true, name = "dataset", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "oldName", description = "属性原来的名字", request = true) String str, @InsightComponentArg(name = "newName", description = "属性的新名字", request = true) String str2) {
        if (Strings.isNullOrEmpty(str) || Strings.isNullOrEmpty(str2) || dataset == null) {
            logger.info("oldName or newname is empty!");
            return dataset;
        }
        String[] a = com.datastax.data.prepare.util.c.a(str.split(Consts.DELIMITER));
        String[] a2 = com.datastax.data.prepare.util.c.a(str2.split(Consts.DELIMITER));
        if (a.length != a2.length) {
            logger.info("The number of newname is not the same as the number of oldname");
            return dataset;
        }
        for (String str3 : a2) {
            if (Strings.isNullOrEmpty(str3)) {
                logger.info("newName is empty!");
                return dataset;
            }
        }
        for (int i = 0; i < a2.length; i++) {
            dataset = dataset.withColumnRenamed(a[i], a2[i]);
        }
        return dataset;
    }

    @InsightComponent(name = "命名", description = "命名组件, 包含重命名和初始化命名")
    public static <T> Dataset<T> name(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "参数", description = "命名参数") JSONObject jSONObject) {
        if (jSONObject.isEmpty()) {
            logger.info("命名组件的参数为空,返回原数据集");
            return dataset;
        }
        if (dataset == null) {
            logger.info("命名组件参数中的数据集为空，返回null");
            return null;
        }
        String string = jSONObject.getString("selector");
        StructField[] fields = dataset.schema().fields();
        if (Consts.INITIALIZA_NAME.equals(string)) {
            String string2 = jSONObject.getString("selectorValue");
            String trim = jSONObject.getString("method").trim();
            if (trim.length() == 0) {
                logger.info("命名组件的初始化命名的value为空, 默认设为列名自动生成, 前缀为_c");
                trim = "_c";
                string2 = Consts.AUTO;
            }
            if (Consts.AUTO.equals(string2)) {
                String[] strArr = new String[fields.length];
                for (int i = 0; i < fields.length; i++) {
                    strArr[i] = trim + String.valueOf(i);
                }
                return dataset.toDF(strArr);
            }
            if (Consts.MANUAL.equals(string2)) {
                String[] a = com.datastax.data.prepare.util.c.a(trim.split(Consts.DELIMITER));
                if (fields.length != a.length) {
                    throw new com.datastax.data.prepare.util.a("初始化命名的列名个数和数据集的列名个数不相等");
                }
                return dataset.toDF(a);
            }
        }
        if (!Consts.RENAME.equals(string)) {
            return null;
        }
        JSONArray jSONArray = jSONObject.getJSONArray("selectorValue");
        for (int i2 = 0; i2 < jSONArray.size(); i2++) {
            JSONObject jSONObject2 = jSONArray.getJSONObject(i2);
            String trim2 = jSONObject2.getString("oldName").trim();
            String trim3 = jSONObject2.getString("newName").trim();
            if (trim2.length() == 0 || trim3.length() == 0) {
                logger.info("重命名的某一参数为空, 跳过");
            }
            dataset = dataset.withColumnRenamed(trim2, trim3);
        }
        return dataset;
    }

    @InsightComponent(name = "聚合", description = "聚合")
    public static <T> Dataset<T> aggregate(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "分组类型", description = "分组类型", defaultValue = "groupBy", items = "groupBy;rollup;cube") String str, @InsightComponentArg(name = "分组列名", description = "分组的列名,用;隔开") String str2, @InsightComponentArg(name = "聚合函数类型", description = "聚合函数类型", defaultValue = "sum", items = "min;max;avg;sum;count;collect_list;collect_set;distinct_count;distinct_sum;approx_count_distinct") String str3, @InsightComponentArg(name = "聚合列名", description = "聚合的列名,用;隔开") String str4) {
        return a(dataset, new com.datastax.data.prepare.spark.dataset.params.a(str, str2, str3, str4));
    }

    protected static <T> Dataset<T> a(Dataset<T> dataset, com.datastax.data.prepare.spark.dataset.params.a aVar) {
        if (aVar.getFuncColumns() == null || aVar.getFuncColumns().length() == 0) {
            logger.info("聚合选中的列名数组为空，返回原数据集");
            return dataset;
        }
        if (aVar.getType() == null || aVar.getType().length() == 0) {
            aVar.setType(Consts.GROUPBY);
        }
        if (aVar.getFuncType() == null || aVar.getFuncType().length() == 0) {
            aVar.setFuncType(Consts.SUM);
        }
        boolean isNullOrEmpty = Strings.isNullOrEmpty(aVar.getColumns());
        Column[] a = a((String) null, com.datastax.data.prepare.util.c.a(aVar.getColumns().split(Consts.DELIMITER)));
        Column[] a2 = a(aVar.getFuncType(), com.datastax.data.prepare.util.c.a(aVar.getFuncColumns().split(Consts.DELIMITER)));
        boolean z = a2.length == 1;
        Column[] columnArr = null;
        if (!z) {
            columnArr = new Column[a2.length - 1];
            if (a2.length == 0) {
                throw new NullPointerException("addColumns为空");
            }
            System.arraycopy(a2, 1, columnArr, 0, a2.length - 1);
        }
        if (Consts.GROUPBY.equals(aVar.getType())) {
            dataset = isNullOrEmpty ? z ? dataset.groupBy(new Column[0]).agg(a2[0], new Column[0]) : dataset.groupBy(new Column[0]).agg(a2[0], columnArr) : z ? dataset.groupBy(a).agg(a2[0], new Column[0]) : dataset.groupBy(a).agg(a2[0], columnArr);
        }
        if (Consts.ROLLUP.equals(aVar.getType())) {
            dataset = isNullOrEmpty ? z ? dataset.rollup(new Column[0]).agg(a2[0], new Column[0]) : dataset.rollup(new Column[0]).agg(a2[0], columnArr) : z ? dataset.rollup(a).agg(a2[0], new Column[0]) : dataset.rollup(a).agg(a2[0], columnArr);
        }
        if (Consts.CUBE.equals(aVar.getType())) {
            dataset = isNullOrEmpty ? z ? dataset.cube(new Column[0]).agg(a2[0], new Column[0]) : dataset.cube(new Column[0]).agg(a2[0], columnArr) : z ? dataset.cube(a).agg(a2[0], new Column[0]) : dataset.cube(a).agg(a2[0], columnArr);
        }
        return dataset;
    }

    private static Column[] a(String str, String[] strArr) {
        Column[] columnArr = new Column[strArr.length];
        int i = 0;
        for (int i2 = 0; i2 < strArr.length; i2++) {
            if (strArr[i2] != null && strArr[i2].length() != 0) {
                if (str == null) {
                    int i3 = i;
                    i++;
                    columnArr[i3] = functions.col(strArr[i2]);
                } else if (Consts.MIN.equals(str)) {
                    int i4 = i;
                    i++;
                    columnArr[i4] = functions.min(functions.col(strArr[i2]));
                } else if (Consts.MAX.equals(str)) {
                    int i5 = i;
                    i++;
                    columnArr[i5] = functions.max(functions.col(strArr[i2]));
                } else if (Consts.AVG.equals(str)) {
                    int i6 = i;
                    i++;
                    columnArr[i6] = functions.avg(functions.col(strArr[i2]));
                } else if (Consts.SUM.equals(str)) {
                    int i7 = i;
                    i++;
                    columnArr[i7] = functions.sum(functions.col(strArr[i2]));
                } else if (Consts.COUNT.equals(str)) {
                    int i8 = i;
                    i++;
                    columnArr[i8] = functions.count(functions.col(strArr[i2]));
                } else if (Consts.COLLECT_LIST.equals(str)) {
                    int i9 = i;
                    i++;
                    columnArr[i9] = functions.collect_list(functions.col(strArr[i2]));
                } else if (Consts.COLLECT_SET.equals(str)) {
                    int i10 = i;
                    i++;
                    columnArr[i10] = functions.collect_set(functions.col(strArr[i2]));
                } else if (Consts.DISTINCT_COUNT.equals(str)) {
                    int i11 = i;
                    i++;
                    columnArr[i11] = functions.countDistinct(functions.col(strArr[i2]), new Column[0]);
                } else if (Consts.DISTINCT_SUM.equals(str)) {
                    int i12 = i;
                    i++;
                    columnArr[i12] = functions.sumDistinct(functions.col(strArr[i2]));
                } else if (Consts.APPROX_COUNT_DISTINCT.equals(str)) {
                    int i13 = i;
                    i++;
                    columnArr[i13] = functions.approx_count_distinct(functions.col(strArr[i2]));
                }
            }
        }
        return columnArr;
    }

    @InsightComponent(name = "explode", description = "用于数据集的某一行数组拆分成多行")
    public static <T> Dataset<T> explode(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集", defaultValue = "${output}") Dataset<T> dataset, @InsightComponentArg(name = "扩展行数据格式", description = "数据格式是数组或者以分隔符隔开的字符串", items = "数组;有分隔符的字符串") String str, @InsightComponentArg(name = "扩展行的列名", description = "用于扩展行的列名,多列以分隔符隔开") String str2, @InsightComponentArg(name = "扩展行后的新列名", description = "扩展行后的新列名,多列以分隔符隔开") String str3, @InsightComponentArg(name = "分隔符", description = "分隔符") String str4) {
        if (dataset == null) {
            logger.info("数据集为空");
            return dataset;
        }
        if (str2 == null || str2.trim().length() == 0) {
            throw new NullPointerException("扩展行的列名参数为空");
        }
        if (str3 == null || str3.trim().length() == 0) {
            throw new NullPointerException("扩展行后的新列名为空");
        }
        if (str == null || str.trim().length() == 0) {
            throw new NullPointerException("扩展行的数据格式为空");
        }
        Dataset<T> dataset2 = null;
        if ("数组".equals(str)) {
            if (!a(dataset.schema().fields(), str2)) {
                throw new IllegalArgumentException("扩展行的列名在数据集中不存在或者类型不匹配");
            }
            dataset2 = dataset.withColumn(str3, functions.explode(functions.col(str2)));
        } else if ("有分隔符的字符串".equals(str)) {
            if (str4 == null || str4.trim().length() == 0) {
                throw new NullPointerException("分隔符参数为空");
            }
            dataset2 = dataset.withColumn(str3, functions.explode(functions.split(functions.col(str2), com.datastax.data.prepare.util.b.a(str4))));
        }
        return (dataset2 == null || str2.equals(str3)) ? dataset2 : dataset2.drop(functions.col(str2));
    }

    private static boolean a(StructField[] structFieldArr, String str) {
        for (StructField structField : structFieldArr) {
            if (structField.name().equals(str) && Consts.ARRAY.equals(structField.dataType().typeName())) {
                System.out.println("type:" + structField.dataType().typeName());
                return true;
            }
        }
        return false;
    }

    @InsightComponent(name = "DatasetToRDD", description = "数据集转换成RDD")
    public static <T> RDD<T> ds2Rdd(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset) {
        if (dataset == null) {
            throw new NullPointerException("数据集为空");
        }
        return dataset.rdd();
    }

    @InsightComponent(name = "window", description = "按照一定参数将多行数据合并为一个窗口，并进行一定处理")
    public static <T> Dataset<T> window(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "分片的列名", description = "分片的列名，以分号隔开") String str, @InsightComponentArg(name = "排序的列名", description = "排序的列名，以分号隔开") String str2, @InsightComponentArg(name = "操作类型", description = "用于指定对目标列的操作类型", items = "collect_list") String str3, @InsightComponentArg(name = "目标列", description = "要处理的列名") String str4, @InsightComponentArg(name = "行数", description = "window包含的行数") int i) {
        if (dataset == null) {
            throw new NullPointerException("数据集为空");
        }
        a(str, "分片的列名为空");
        a(str2, "排序的列名为空");
        a(str3, "操作类型为空");
        a(str4, "目标列为空");
        return dataset.withColumn(str4, functions.collect_list(str4).over(Window.partitionBy(a(dataset, str.split(Consts.DELIMITER))).orderBy(a(dataset, str2.split(Consts.DELIMITER))).rowsBetween(Window.currentRow(), i <= 1 ? 1 : i - 1)));
    }

    private static void a(String str, String str2) {
        if (str == null || str.trim().length() == 0) {
            throw new NullPointerException(str2);
        }
    }

    private static Column[] a(Dataset dataset, String[] strArr) {
        Column[] columnArr = new Column[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            columnArr[i] = dataset.col(strArr[i]);
        }
        return columnArr;
    }

    @InsightComponent(name = "多维数组合并去重", description = "对于某一列进行多维数组合并并去重，生成一维数组")
    public static <T> Dataset<T> wrapArray2Array(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "合并列", description = "用于合并的列，列的类型必须为多维数组") String str) {
        if (dataset == null) {
            throw new NullPointerException("数据集为空");
        }
        a(str, "合并列参数为空");
        return (Dataset<T>) p.c(dataset.toDF(), str);
    }

    @InsightComponent(name = "数组转字符串", description = "将数组转换成字符串")
    public static <T> Dataset<T> array2String(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "列名", description = "用于转换成String的列名") String str, @InsightComponentArg(name = "连接符", description = "连接符") String str2) {
        if (dataset == null) {
            throw new NullPointerException("数据集为空");
        }
        if (str == null || str.trim().length() == 0) {
            throw new NullPointerException("arrayCol为空");
        }
        String str3 = str2 == null ? Consts.DELIMITER : str2;
        StructField[] fields = dataset.schema().fields();
        boolean z = false;
        int length = fields.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            StructField structField = fields[i];
            if (structField.name().equals(str) && Consts.ARRAY.equals(structField.dataType().typeName())) {
                z = true;
                break;
            }
            i++;
        }
        if (z) {
            return dataset.withColumn(str, functions.concat_ws(str3, new Column[]{functions.col(str)}));
        }
        throw new IllegalArgumentException(str + "列名不存在或者不为Array类型");
    }

    @InsightComponent(name = "数学运算", description = "对列进行数学运算")
    public static <T> Dataset<T> mathCompute(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "列名", description = "用于计算的列名") String str, @InsightComponentArg(name = "运算方法", description = "数学运算方法", defaultValue = "plus", items = "plus;minus;multiply;divide") String str2, @InsightComponentArg(name = "类型", description = "值对应的类型，列名或者常量", defaultValue = "constant", items = "constant;column") String str3, @InsightComponentArg(name = "值", description = "输入值") String str4) {
        if (dataset == null) {
            throw new IllegalArgumentException("数据集为空");
        }
        if (str2 == null || str2.length() == 0) {
            throw new IllegalArgumentException("运算方法为空");
        }
        if (str == null || str.length() == 0) {
            throw new IllegalArgumentException("列名为空");
        }
        if (str4 == null || str4.length() == 0) {
            throw new IllegalArgumentException("列名为空");
        }
        return (Dataset<T>) a.a(dataset.toDF(), str, str2, str4, str3, str2 + "(" + str + "," + str4 + ")");
    }

    @InsightComponent(name = "分组过滤", description = "将某些列进行分组后聚合，再加上某些条件对聚合后的结果加以过滤")
    public static <T> Dataset<T> groupFilter(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "分组列", description = "用于分组的列，多个列用分号隔开") String str, @InsightComponentArg(name = "聚合函数类型", description = "聚合函数类型", defaultValue = "count", items = "min;max;avg;sum;count;distinct_count;distinct_sum;approx_count_distinct") String str2, @InsightComponentArg(name = "聚合列名", description = "聚合的列名") String str3, @InsightComponentArg(name = "比较", description = "数学比较", defaultValue = "大于", items = "大于;不小于;小于;不大于;等于") String str4, @InsightComponentArg(name = "阈值", description = "用于过滤的阈值") double d) {
        if (str2 == null || str2.trim().length() == 0) {
            throw new IllegalArgumentException("agg method 为空");
        }
        if (str == null || str.trim().length() == 0) {
            throw new IllegalArgumentException("group column 为空");
        }
        if (str3 == null || str3.trim().length() == 0) {
            throw new IllegalArgumentException("func column 为空");
        }
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println("aggregate begin");
        Dataset aggregate = aggregate(dataset, Consts.GROUPBY, str, str2, str3);
        System.out.println("aggregate end");
        System.out.println("aggregate time: " + (System.currentTimeMillis() - currentTimeMillis));
        aggregate.persist();
        String[] split = str.split(Consts.DELIMITER);
        Column column = null;
        String[] columns = dataset.columns();
        Column[] columnArr = new Column[columns.length];
        for (int i = 0; i < split.length; i++) {
            column = column == null ? dataset.col(split[i]).equalTo(aggregate.col(split[i])) : column.and(dataset.col(split[i]).equalTo(aggregate.col(split[i])));
        }
        for (int i2 = 0; i2 < columns.length; i2++) {
            columnArr[i2] = dataset.col(columns[i2]);
        }
        String str5 = Consts.DISTINCT_COUNT.equals(str2) ? "count(DISTINCT " + str3 + ")" : Consts.DISTINCT_SUM.equals(str2) ? "sum(DISTINCT " + str3 + ")" : str2 + "(" + str3 + ")";
        aggregate.unpersist();
        System.out.println("filter begin");
        Dataset<Row> a = a.a(aggregate.toDF(), str5, Consts.a.b(str4), d);
        System.out.println("filter end ");
        System.out.println("filter time: " + (System.currentTimeMillis() - currentTimeMillis));
        a.persist();
        System.out.println("join begin");
        Dataset join = dataset.join(a, column, "right");
        System.out.println("join end");
        System.out.println("filter time: " + (System.currentTimeMillis() - currentTimeMillis));
        a.unpersist();
        return join.select(columnArr);
    }

    @InsightComponent(name = Consts.COUNT, description = Consts.COUNT)
    public static <T> Dataset<T> countRow(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset) {
        SparkContextBuilder.getSession().log().info("数据集行数为： " + dataset.count());
        return dataset;
    }

    @InsightComponent(name = "时间过滤", description = "时间过滤")
    public static <T> Dataset<T> filterDate(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "时间列", description = "用于比较的时间列") String str, @InsightComponentArg(name = "起始时间", description = "起始时间") String str2, @InsightComponentArg(name = "终止时间", description = "终止时间") String str3, @InsightComponentArg(name = "时间格式", description = "时间格式，例如yyyy-MM-dd") String str4) {
        if (dataset == null) {
            throw new NullPointerException("数据集为空");
        }
        if (a(str)) {
            throw new NullPointerException("时间列为空");
        }
        if (a(str4)) {
            throw new NullPointerException("时间格式为空");
        }
        boolean a = a(str2);
        boolean a2 = a(str3);
        if (a && a2) {
            return dataset;
        }
        return dataset.filter((a || a2) ? a ? functions.date_format(dataset.col(str), str4).leq(functions.date_format(functions.lit(str3), str4)) : functions.date_format(dataset.col(str), str4).geq(functions.date_format(functions.lit(str2), str4)) : functions.date_format(dataset.col(str), str4).geq(functions.date_format(functions.lit(str2), str4)).and(functions.date_format(dataset.col(str), str4).leq(functions.date_format(functions.lit(str3), str4))));
    }

    @InsightComponent(name = "去重", description = "去重")
    public static <T> Dataset<T> distinct(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset) {
        return dataset.distinct();
    }

    private static boolean a(String str) {
        return str == null || str.trim().length() == 0;
    }

    @InsightComponent(name = "checkpoint", description = "checkpoint")
    public static <T> Dataset<T> checkpoint(@InsightComponentArg(externalInput = true, name = "数据集", description = "数据集") Dataset<T> dataset, @InsightComponentArg(name = "路径", description = "路径", defaultValue = "${MISC_FOLDER}") String str) {
        SparkContextBuilder.getSession().sparkContext().setCheckpointDir(str + "/checkpoint");
        return dataset.checkpoint();
    }
}
