package org.apache.hudi.utilities.deltastreamer;

import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.class */
public class TestSourceFormatAdapter {
    protected static SparkSession spark;
    protected static JavaSparkContext jsc;
    private static final String DUMMY_CHECKPOINT = "dummy_checkpoint";
    private TestRowDataSource testRowDataSource;
    private TestJsonDataSource testJsonDataSource;

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter$BasicSchemaProvider.class */
    public static class BasicSchemaProvider extends SchemaProvider {
        private final Schema schema;

        public BasicSchemaProvider(Schema schema) {
            this(null, null, schema);
        }

        public BasicSchemaProvider(TypedProperties typedProperties, JavaSparkContext javaSparkContext, Schema schema) {
            super(typedProperties, javaSparkContext);
            this.schema = schema;
        }

        public Schema getSourceSchema() {
            return this.schema;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter$TestJsonDataSource.class */
    public static class TestJsonDataSource extends Source<JavaRDD<String>> {
        private final InputBatch<JavaRDD<String>> batch;

        public TestJsonDataSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, InputBatch<JavaRDD<String>> inputBatch) {
            super(typedProperties, javaSparkContext, sparkSession, schemaProvider, Source.SourceType.JSON);
            this.batch = inputBatch;
        }

        protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> option, long j) {
            return this.batch;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter$TestRowDataSource.class */
    public static class TestRowDataSource extends RowSource {
        private final InputBatch<Dataset<Row>> batch;

        public TestRowDataSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, InputBatch<Dataset<Row>> inputBatch) {
            super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
            this.batch = inputBatch;
        }

        protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) {
            return Pair.of(this.batch.getBatch(), this.batch.getCheckpointForNextBatch());
        }
    }

    @BeforeAll
    public static void start() {
        spark = SparkSession.builder().master("local[*]").appName(TestSourceFormatAdapter.class.getName()).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate();
        jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
    }

    @AfterAll
    public static void shutdown() {
        jsc.close();
        spark.close();
    }

    @AfterEach
    public void teardown() {
        this.testRowDataSource = null;
        this.testJsonDataSource = null;
    }

    private void setupRowSource(Dataset<Row> dataset, TypedProperties typedProperties, SchemaProvider schemaProvider) {
        this.testRowDataSource = new TestRowDataSource(typedProperties, jsc, spark, schemaProvider, new InputBatch(Option.of(dataset), DUMMY_CHECKPOINT, schemaProvider));
    }

    private void setupJsonSource(JavaRDD<String> javaRDD, Schema schema) {
        BasicSchemaProvider basicSchemaProvider = new BasicSchemaProvider(schema);
        this.testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc, spark, basicSchemaProvider, new InputBatch(Option.of(javaRDD), DUMMY_CHECKPOINT, basicSchemaProvider));
    }

    private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> javaRDD, StructType structType, SchemaProvider schemaProvider) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
        typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
        setupRowSource(spark.read().schema(structType).json(javaRDD), typedProperties, schemaProvider);
        return new SourceFormatAdapter(this.testRowDataSource, Option.empty(), Option.of(typedProperties)).fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
    }

    private InputBatch<Dataset<Row>> fetchJsonData(JavaRDD<String> javaRDD, StructType structType) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
        typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
        setupJsonSource(javaRDD, SchemaConverters.toAvroType(structType, false, "record", ""));
        return new SourceFormatAdapter(this.testJsonDataSource, Option.empty(), Option.of(typedProperties)).fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
    }

    private void verifySanitization(InputBatch<Dataset<Row>> inputBatch, String str, StructType structType) {
        JavaRDD textFile = jsc.textFile(str);
        Assertions.assertTrue(inputBatch.getBatch().isPresent());
        Dataset dataset = (Dataset) inputBatch.getBatch().get();
        Assertions.assertEquals(2, dataset.collectAsList().size());
        Assertions.assertEquals(structType, dataset.schema());
        if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
            Assertions.assertEquals(AvroConversionUtils.convertStructTypeToAvroSchema(structType, "hoodie_source", "hoodie.source"), inputBatch.getSchemaProvider().getSourceSchema());
        }
        Assertions.assertEquals(textFile.collect(), dataset.toJSON().collectAsList());
    }

    @MethodSource({"provideDataFiles"})
    @ParameterizedTest
    public void testRowSanitization(String str, String str2, StructType structType, StructType structType2) {
        JavaRDD<String> textFile = jsc.textFile(str);
        verifySanitization(fetchRowData(textFile, structType, new InputBatch.NullSchemaProvider()), str2, structType2);
        verifySanitization(fetchRowData(textFile, structType, null), str2, structType2);
    }

    @MethodSource({"provideDataFiles"})
    @ParameterizedTest
    public void testJsonSanitization(String str, String str2, StructType structType, StructType structType2) {
        verifySanitization(fetchJsonData(jsc.textFile(str), structType2), str2, structType2);
    }

    private static Stream<Arguments> provideDataFiles() {
        return SanitizationTestUtils.provideDataFiles();
    }
}
