package org.apache.kylin.engine.spark.builder.v3dict;

import io.delta.tables.DeltaTable;
import io.delta.tables.DeltaTable$;
import java.nio.file.Paths;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.dict.NBucketDictionary;
import org.apache.spark.dict.NGlobalDictMetaInfo;
import org.apache.spark.dict.NGlobalDictionaryV2;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkInternalAgent$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project;
import org.apache.spark.sql.catalyst.plans.logical.Window;
import org.apache.spark.sql.expressions.Window$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.mutable.ListBuffer;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import util.retry.blocking.RetryStrategy;
import util.retry.blocking.RetryStrategy$;

/* compiled from: DictionaryBuilder.scala */
/* loaded from: input_file:org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder$.class */
public final class DictionaryBuilder$ implements Logging {
    public static DictionaryBuilder$ MODULE$;
    private final Function0<RetryStrategy> retryStrategy;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new DictionaryBuilder$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public Function0<RetryStrategy> retryStrategy() {
        return this.retryStrategy;
    }

    public LogicalPlan buildGlobalDict(String str, SparkSession sparkSession, LogicalPlan logicalPlan) {
        return transformCountDistinct(sparkSession, logicalPlan).transform(new DictionaryBuilder$$anonfun$buildGlobalDict$1(sparkSession, str));
    }

    private LogicalPlan transformerDictPlan(SparkSession sparkSession, DictionaryContext dictionaryContext, LogicalPlan logicalPlan) {
        LogicalPlan logicalPlan2;
        DeltaTable forPath = DeltaTable$.MODULE$.forPath(getDictionaryPath(dictionaryContext));
        long count = forPath.toDF().count();
        if (logicalPlan instanceof Project) {
            Project child = ((Project) logicalPlan).child();
            if (child instanceof Project) {
                Window child2 = child.child();
                if (child2 instanceof Window) {
                    LogicalPlan child3 = child2.child();
                    String expr = dictionaryContext.expr();
                    logicalPlan2 = SparkInternalAgent$.MODULE$.getLogicalPlan(SparkInternalAgent$.MODULE$.getDataFrame(sparkSession, child3).filter(functions$.MODULE$.col(((NamedExpression) SparkInternalAgent$.MODULE$.getLogicalPlan(forPath.toDF()).output().head()).name()).isNotNull()).join(forPath.toDF(), SparkInternalAgent$.MODULE$.createColumn(new EqualTo(functions$.MODULE$.col(expr).cast(StringType$.MODULE$).expr(), (Expression) SparkInternalAgent$.MODULE$.getLogicalPlan(forPath.toDF()).output().head())), "left_anti").select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(expr).cast(StringType$.MODULE$).as("dict_key"), functions$.MODULE$.row_number().over(Window$.MODULE$.orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(expr)}))).$plus(functions$.MODULE$.lit(BoxesRunTime.boxToLong(count))).cast(LongType$.MODULE$).as("dict_value")})));
                    return logicalPlan2;
                }
            }
        }
        logicalPlan2 = logicalPlan;
        return logicalPlan2;
    }

    private Enumeration.Value chooseDictBuildMode(DictionaryContext dictionaryContext) {
        return isExistsV3Dict(dictionaryContext) ? DictBuildMode$.MODULE$.V3APPEND() : isExistsOriginalV3Dict(dictionaryContext) ? DictBuildMode$.MODULE$.V3UPGRADE() : (KylinConfig.getInstanceFromEnv().isConvertV3DictEnable() && isExistsV2Dict(dictionaryContext)) ? DictBuildMode$.MODULE$.V2UPGRADE() : DictBuildMode$.MODULE$.V3INIT();
    }

    public void org$apache$kylin$engine$spark$builder$v3dict$DictionaryBuilder$$incrementBuildDict(SparkSession sparkSession, LogicalPlan logicalPlan, DictionaryContext dictionaryContext) {
        Enumeration.Value chooseDictBuildMode = chooseDictBuildMode(dictionaryContext);
        logInfo(() -> {
            return new StringBuilder(22).append("V3 Dict build mode is ").append(chooseDictBuildMode).toString();
        });
        Enumeration.Value V3INIT = DictBuildMode$.MODULE$.V3INIT();
        if (V3INIT != null ? V3INIT.equals(chooseDictBuildMode) : chooseDictBuildMode == null) {
            initAndSaveDictDF(SparkInternalAgent$.MODULE$.getDataFrame(sparkSession, logicalPlan), dictionaryContext);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        Enumeration.Value V3APPEND = DictBuildMode$.MODULE$.V3APPEND();
        if (V3APPEND != null ? V3APPEND.equals(chooseDictBuildMode) : chooseDictBuildMode == null) {
            mergeIncrementDict(sparkSession, dictionaryContext, logicalPlan);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        Enumeration.Value V3UPGRADE = DictBuildMode$.MODULE$.V3UPGRADE();
        if (V3UPGRADE != null ? V3UPGRADE.equals(chooseDictBuildMode) : chooseDictBuildMode == null) {
            initAndSaveDictDF(upgradeFromOriginalV3(sparkSession, dictionaryContext), dictionaryContext);
            mergeIncrementDict(sparkSession, dictionaryContext, logicalPlan);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        Enumeration.Value V2UPGRADE = DictBuildMode$.MODULE$.V2UPGRADE();
        if (V2UPGRADE != null ? !V2UPGRADE.equals(chooseDictBuildMode) : chooseDictBuildMode != null) {
            throw new MatchError(chooseDictBuildMode);
        }
        initAndSaveDictDF(upgradeFromV2(sparkSession, dictionaryContext), dictionaryContext);
        mergeIncrementDict(sparkSession, dictionaryContext, logicalPlan);
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    private void initAndSaveDictDF(Dataset<Row> dataset, DictionaryContext dictionaryContext) {
        String dictionaryPath = getDictionaryPath(dictionaryContext);
        logInfo(() -> {
            return new StringBuilder(28).append("Save dict values into path ").append(dictionaryPath).append(".").toString();
        });
        dataset.write().mode(SaveMode.Overwrite).format("delta").save(dictionaryPath);
    }

    private void mergeIncrementDict(SparkSession sparkSession, DictionaryContext dictionaryContext, LogicalPlan logicalPlan) {
        Dataset<Row> dataFrame = SparkInternalAgent$.MODULE$.getDataFrame(sparkSession, transformerDictPlan(sparkSession, dictionaryContext, logicalPlan));
        String dictionaryPath = getDictionaryPath(dictionaryContext);
        logInfo(() -> {
            return new StringBuilder(28).append("increment build global dict ").append(dictionaryPath).toString();
        });
        DeltaTable$.MODULE$.forPath(dictionaryPath).alias("dict").merge(dataFrame.alias("incre_dict"), "incre_dict.dict_key = dict.dict_key and incre_dict.dict_value != dict.dict_value").whenNotMatched().insertAll().execute();
    }

    private boolean isExistsV2Dict(DictionaryContext dictionaryContext) {
        NGlobalDictionaryV2 nGlobalDictionaryV2 = new NGlobalDictionaryV2(dictionaryContext.project(), new StringBuilder(1).append(dictionaryContext.dbName()).append(".").append(dictionaryContext.tableName()).toString(), dictionaryContext.columnName(), KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
        if (nGlobalDictionaryV2.getMetaInfo() != null) {
            logInfo(() -> {
                return new StringBuilder(15).append("Exists V2 dict ").append(nGlobalDictionaryV2.getResourceDir()).toString();
            });
            return true;
        }
        logInfo(() -> {
            return new StringBuilder(19).append("Not exists V2 dict ").append(nGlobalDictionaryV2.getResourceDir()).toString();
        });
        return false;
    }

    private boolean isExistsV3Dict(DictionaryContext dictionaryContext) {
        return HadoopUtil.getWorkingFileSystem().exists(new Path(getDictionaryPath(dictionaryContext)));
    }

    private boolean isExistsOriginalV3Dict(DictionaryContext dictionaryContext) {
        return HadoopUtil.getWorkingFileSystem().exists(new Path(getOriginalDictionaryPath(dictionaryContext)));
    }

    private Dataset<Row> fetchExistsOriginalV3Dict(DictionaryContext dictionaryContext) {
        return DeltaTable$.MODULE$.forPath(getOriginalDictionaryPath(dictionaryContext)).toDF();
    }

    private LogicalPlan transformCountDistinct(SparkSession sparkSession, LogicalPlan logicalPlan) {
        return new PreCountDistinctTransformer(sparkSession).apply(logicalPlan);
    }

    private Dataset<Row> upgradeFromV2(SparkSession sparkSession, DictionaryContext dictionaryContext) {
        NGlobalDictionaryV2 nGlobalDictionaryV2 = new NGlobalDictionaryV2(dictionaryContext.project(), new StringBuilder(1).append(dictionaryContext.dbName()).append(".").append(dictionaryContext.tableName()).toString(), dictionaryContext.columnName(), KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
        NGlobalDictMetaInfo metaInfo = nGlobalDictionaryV2.getMetaInfo();
        logInfo(() -> {
            return new StringBuilder(20).append("Exists V2 dict ").append(nGlobalDictionaryV2.getResourceDir()).append(" num ").append(metaInfo.getDictCount()).toString();
        });
        Broadcast broadcast = sparkSession.sparkContext().broadcast(nGlobalDictionaryV2, ClassTag$.MODULE$.apply(NGlobalDictionaryV2.class));
        StructType structType = new StructType(new StructField[]{new StructField("dict_key", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("dict_value", LongType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())});
        return metaInfo != null ? sparkSession.createDataset(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), metaInfo.getBucketSize()), sparkSession.implicits().newIntEncoder()).flatMap(obj -> {
            return $anonfun$upgradeFromV2$2(broadcast, BoxesRunTime.unboxToInt(obj));
        }, RowEncoder$.MODULE$.apply(structType)) : sparkSession.emptyDataset(RowEncoder$.MODULE$.apply(structType));
    }

    private Dataset<Row> upgradeFromOriginalV3(SparkSession sparkSession, DictionaryContext dictionaryContext) {
        return isExistsOriginalV3Dict(dictionaryContext) ? fetchExistsOriginalV3Dict(dictionaryContext) : sparkSession.emptyDataFrame();
    }

    private String getOriginalDictionaryPath(DictionaryContext dictionaryContext) {
        String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
        return new StringBuilder(0).append(hdfsWorkingDirectory).append(new Path(dictionaryContext.project(), new Path("/dict/global_dict_v3", new Path(new Path(dictionaryContext.tableName()), dictionaryContext.columnName())))).toString();
    }

    public String getDictionaryPath(DictionaryContext dictionaryContext) {
        return new StringBuilder(0).append(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()).append(Paths.get(dictionaryContext.project(), "/dict/global_dict_v3", dictionaryContext.dbName(), dictionaryContext.tableName(), dictionaryContext.columnName())).toString();
    }

    public String wrapCol(TblColRef tblColRef) {
        return NSparkCubingUtil.convertFromDot(tblColRef.getBackTickIdentity());
    }

    public static final /* synthetic */ Iterator $anonfun$upgradeFromV2$2(Broadcast broadcast, int i) {
        NBucketDictionary loadBucketDictionary = ((NGlobalDictionaryV2) broadcast.value()).loadBucketDictionary(i);
        ListBuffer listBuffer = new ListBuffer();
        loadBucketDictionary.getAbsoluteDictMap().object2LongEntrySet().forEach(entry -> {
            listBuffer.append(Predef$.MODULE$.wrapRefArray(new Row[]{Row$.MODULE$.fromTuple(new Tuple2(entry.getKey(), BoxesRunTime.boxToLong(entry.getLongValue())))}));
        });
        return listBuffer.iterator();
    }

    private DictionaryBuilder$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.retryStrategy = RetryStrategy$.MODULE$.fixedBackOff(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds(), 5);
    }
}
