package org.apache.hudi.integ.testsuite.dag.nodes;

import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator;
import org.apache.hudi.integ.testsuite.reader.SparkBasedReader;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.mutable.Buffer;

/* loaded from: input_file:org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.class */
public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
    public BaseValidateDatasetNode(DeltaConfig.Config config) {
        this.config = config;
    }

    public abstract Logger getLogger();

    public abstract Dataset<Row> getDatasetToValidate(SparkSession sparkSession, ExecutionContext executionContext, StructType structType);

    @Override // org.apache.hudi.integ.testsuite.dag.nodes.DagNode
    public void execute(ExecutionContext executionContext, int i) throws Exception {
        SparkSession orCreate = SparkSession.builder().sparkContext(executionContext.getJsc().sc()).getOrCreate();
        String str = executionContext.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
        log.warn("Validation using data from input path " + str);
        String str2 = executionContext.getHoodieTestSuiteWriter().getCfg().inputBasePath;
        if (log.isDebugEnabled()) {
            FileStatus[] listStatus = new Path(str2).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration()).listStatus(new Path(str2));
            log.info("fileStatuses length: " + listStatus.length);
            for (FileStatus fileStatus : listStatus) {
                log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
            }
        }
        Dataset<Row> inputDf = getInputDf(executionContext, orCreate, str);
        Dataset<Row> datasetToValidate = getDatasetToValidate(orCreate, executionContext, inputDf.schema());
        Dataset intersect = inputDf.intersect(datasetToValidate);
        long count = inputDf.count();
        long count2 = datasetToValidate.count();
        log.debug("Input count: " + count + "; output count: " + count2);
        if (count2 == 0 || count == 0 || inputDf.except(intersect).count() != 0) {
            log.error("Data set validation failed. Total count in hudi " + count2 + ", input df count " + count);
            throw new AssertionError("Hudi contents does not match contents input data. ");
        }
        if (this.config.isValidateHive()) {
            String string = executionContext.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
            String string2 = executionContext.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
            log.warn("Validating hive table with db : " + string + " and table : " + string2);
            Dataset intersect2 = inputDf.intersect(orCreate.sql("SELECT * FROM " + string + "." + string2).drop("_hoodie_commit_time").drop("_hoodie_commit_seqno").drop("_hoodie_record_key").drop("_hoodie_partition_path").drop("_hoodie_file_name"));
            long count3 = datasetToValidate.count();
            log.warn("Input count: " + count + "; output count: " + count3);
            if (count3 == 0 || inputDf.except(intersect2).count() != 0) {
                log.error("Data set validation failed for COW hive table. Total count in hudi " + count3 + ", input df count " + count);
                throw new AssertionError("Hudi hive table contents does not match contents input data. ");
            }
        }
        if (this.config.isDeleteInputData()) {
            String str3 = executionContext.getHoodieTestSuiteWriter().getCfg().inputBasePath;
            FileSystem fileSystem = new Path(str3).getFileSystem(executionContext.getHoodieTestSuiteWriter().getConfiguration());
            for (FileStatus fileStatus2 : fileSystem.listStatus(new Path(str3))) {
                log.debug("Micro batch to be deleted " + fileStatus2.getPath().toString());
                fileSystem.delete(fileStatus2.getPath(), true);
            }
        }
    }

    private Dataset<Row> getInputDf(ExecutionContext executionContext, SparkSession sparkSession, String str) {
        String string = executionContext.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
        String string2 = executionContext.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
        Dataset load = sparkSession.read().format(SparkBasedReader.SPARK_AVRO_FORMAT).load(str);
        return load.groupByKey(row -> {
            return row.getAs(string2) + "+" + row.getAs(string);
        }, Encoders.STRING()).reduceGroups((row2, row3) -> {
            return ((Integer) row2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD)).intValue() > ((Integer) row3.getAs(SchemaUtils.SOURCE_ORDERING_FIELD)).intValue() ? row2 : row3;
        }).map(tuple2 -> {
            return (Row) tuple2._2;
        }, getEncoder(load.schema())).filter("_hoodie_is_deleted != true");
    }

    private ExpressionEncoder getEncoder(StructType structType) {
        return RowEncoder.apply(structType).resolveAndBind(((Buffer) JavaConverters.asScalaBufferConverter((List) JavaConversions.asJavaCollection(structType.toAttributes()).stream().map((v0) -> {
            return v0.toAttribute();
        }).collect(Collectors.toList())).asScala()).toSeq(), SimpleAnalyzer$.MODULE$);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -923605445:
                if (implMethodName.equals("lambda$getInputDf$3dfc502f$1")) {
                    z = true;
                    break;
                }
                break;
            case 219763427:
                if (implMethodName.equals("lambda$getInputDf$890cbe3a$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1311333628:
                if (implMethodName.equals("lambda$getInputDf$a819a0d4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case GenericRecordFullPayloadGenerator.DEFAULT_START_PARTITION /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Row;Lorg/apache/spark/sql/Row;)Lorg/apache/spark/sql/Row;")) {
                    return (row2, row3) -> {
                        return ((Integer) row2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD)).intValue() > ((Integer) row3.getAs(SchemaUtils.SOURCE_ORDERING_FIELD)).intValue() ? row2 : row3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;Lorg/apache/spark/sql/Row;)Ljava/lang/String;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    return row -> {
                        return row.getAs(str) + "+" + row.getAs(str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lorg/apache/spark/sql/Row;")) {
                    return tuple2 -> {
                        return (Row) tuple2._2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
