package org.apache.kylin.engine.spark.builder;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.engine.spark.metadata.ColumnDesc;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.spark.dict.NGlobalDictionary;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KylinFunctions$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructType;
import scala.Predef$;
import scala.Serializable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;

/* compiled from: CubeTableEncoder.scala */
/* loaded from: input_file:WEB-INF/lib/kylin-spark-engine-4.0.1.jar:org/apache/kylin/engine/spark/builder/CubeTableEncoder$$anonfun$encodeTable$2.class */
public final class CubeTableEncoder$$anonfun$encodeTable$2 extends AbstractFunction1<ColumnDesc, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final Dataset ds$1;
    private final SegmentInfo seg$1;
    private final String jobId$1;
    private final StructType structType$1;
    private final ObjectRef partitionedDs$1;
    private final long minBucketSize$1;
    private final IntRef repartitionSizeAfterEncode$1;

    /* JADX WARN: Type inference failed for: r1v32, types: [org.apache.spark.sql.Dataset, T] */
    /* JADX WARN: Type inference failed for: r1v78, types: [org.apache.spark.sql.Dataset, T] */
    public final void apply(ColumnDesc columnDesc) {
        int bucketSizeOrDefault = new NGlobalDictionary(this.seg$1.project(), columnDesc.tableAliasName(), columnDesc.columnName(), this.seg$1.kylinconf().getHdfsWorkingDirectory()).getBucketSizeOrDefault(this.seg$1.kylinconf().getGlobalDictV2MinHashPartitions());
        int i = (int) (((this.minBucketSize$1 / bucketSizeOrDefault) + 1) * bucketSizeOrDefault);
        if (i > this.repartitionSizeAfterEncode$1.elem) {
            this.repartitionSizeAfterEncode$1.elem = i;
        }
        String convertFromDot = NSparkCubingUtil.convertFromDot(columnDesc.identity());
        int fieldIndex = this.structType$1.fieldIndex(convertFromDot);
        String mkString = Predef$.MODULE$.refArrayOps(new String[]{this.seg$1.project(), columnDesc.tableAliasName(), columnDesc.columnName(), this.seg$1.kylinconf().getHdfsWorkingDirectory()}).mkString(NSparkCubingUtil.SEPARATOR);
        String concat = this.structType$1.apply(fieldIndex).name().concat(CubeBuilderHelper$.MODULE$.ENCODE_SUFFIX());
        Column as = KylinFunctions$.MODULE$.dict_encode(functions$.MODULE$.col(convertFromDot).cast(StringType$.MODULE$), functions$.MODULE$.lit(mkString), functions$.MODULE$.lit(BoxesRunTime.boxToInteger(bucketSizeOrDefault)).cast(StringType$.MODULE$)).as(concat);
        Seq seq = (Seq) ((Dataset) this.partitionedDs$1.elem).schema().map(new CubeTableEncoder$$anonfun$encodeTable$2$$anonfun$1(this), Seq$.MODULE$.canBuildFrom());
        boolean z = false;
        if (this.seg$1.kylinconf().detectDataSkewInDictEncodingEnabled()) {
            Column cast = functions$.MODULE$.col(convertFromDot).cast(StringType$.MODULE$);
            Dataset cache = this.ds$1.select(Predef$.MODULE$.wrapRefArray(new Column[]{cast})).sample(this.seg$1.kylinconf().sampleRateInEncodingSkewDetection()).cache();
            long count = cache.count();
            Path path = new Path(new StringBuilder().append((Object) this.seg$1.kylinconf().getJobTmpDir(this.seg$1.project())).append((Object) "/").append((Object) this.jobId$1).append((Object) "/skewed_data/").append((Object) columnDesc.identity()).toString());
            Object2LongOpenHashMap object2LongOpenHashMap = new Object2LongOpenHashMap();
            Predef$.MODULE$.refArrayOps((Object[]) cache.groupBy(convertFromDot, Predef$.MODULE$.wrapRefArray(new String[0])).agg(functions$.MODULE$.count(functions$.MODULE$.lit(BoxesRunTime.boxToInteger(1))).alias("count_value"), Predef$.MODULE$.wrapRefArray(new Column[0])).filter(functions$.MODULE$.col("count_value").$greater(BoxesRunTime.boxToDouble(count * this.seg$1.kylinconf().skewPercentageThreshHold()))).repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{cast})).select((Seq) scala.collection.mutable.Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{cast, as}))).collect()).foreach(new CubeTableEncoder$$anonfun$encodeTable$2$$anonfun$apply$1(this, object2LongOpenHashMap));
            cache.unpersist();
            if (object2LongOpenHashMap.size() > 0) {
                z = true;
                Kryo kryo = new Kryo();
                FileSystem fileSystem = path.getFileSystem(new Configuration());
                if (fileSystem.exists(path)) {
                    BoxesRunTime.boxToBoolean(fileSystem.delete(path, true));
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                Output output = new Output(fileSystem.create(path));
                kryo.writeClassAndObject(output, object2LongOpenHashMap);
                output.close();
                Column alias = KylinFunctions$.MODULE$.scatter_skew_data(cast, functions$.MODULE$.lit(path.toString())).alias(new StringBuilder().append((Object) "scatter_skew_data_").append((Object) columnDesc.columnName()).toString());
                as = KylinFunctions$.MODULE$.dict_encode(functions$.MODULE$.col(convertFromDot).cast(StringType$.MODULE$), functions$.MODULE$.lit(Predef$.MODULE$.refArrayOps(new String[]{this.seg$1.project(), columnDesc.tableAliasName(), columnDesc.columnName(), this.seg$1.kylinconf().getHdfsWorkingDirectory(), path.toString()}).mkString(NSparkCubingUtil.SEPARATOR)), functions$.MODULE$.lit(BoxesRunTime.boxToInteger(bucketSizeOrDefault)).cast(StringType$.MODULE$)).alias(concat);
                this.partitionedDs$1.elem = ((Dataset) this.partitionedDs$1.elem).select((Seq) seq.$plus$plus(scala.collection.mutable.Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{alias})), Seq$.MODULE$.canBuildFrom())).repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(new StringBuilder().append((Object) "scatter_skew_data_").append((Object) columnDesc.columnName()).toString())})).select((Seq) seq.$plus$plus(scala.collection.mutable.Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{as})), Seq$.MODULE$.canBuildFrom()));
            }
        }
        if (z) {
            return;
        }
        this.partitionedDs$1.elem = ((Dataset) this.partitionedDs$1.elem).repartition(i, Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(convertFromDot).cast(StringType$.MODULE$)})).select((Seq) seq.$plus$plus(scala.collection.mutable.Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{as})), Seq$.MODULE$.canBuildFrom()));
    }

    @Override // scala.Function1
    /* renamed from: apply */
    public final /* bridge */ /* synthetic */ Object mo7602apply(Object obj) {
        apply((ColumnDesc) obj);
        return BoxedUnit.UNIT;
    }

    public CubeTableEncoder$$anonfun$encodeTable$2(Dataset dataset, SegmentInfo segmentInfo, String str, StructType structType, ObjectRef objectRef, long j, IntRef intRef) {
        this.ds$1 = dataset;
        this.seg$1 = segmentInfo;
        this.jobId$1 = str;
        this.structType$1 = structType;
        this.partitionedDs$1 = objectRef;
        this.minBucketSize$1 = j;
        this.repartitionSizeAfterEncode$1 = intRef;
    }
}
