package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.class */
public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDeltaStreamerSchemaEvolutionBase {
    @Override // org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.resetTargetSchema();
    }

    protected static Stream<Arguments> testArgs() {
        Stream.Builder builder = Stream.builder();
        for (Boolean bool : new Boolean[]{true}) {
            for (Boolean bool2 : new Boolean[]{false, true}) {
                for (Boolean bool3 : new Boolean[]{false, true}) {
                    for (Boolean bool4 : new Boolean[]{false, true}) {
                        for (Boolean bool5 : new Boolean[]{false, true}) {
                            for (Boolean bool6 : new Boolean[]{false, true}) {
                                for (String str : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                                    if (!bool5.booleanValue() || str.equals("MERGE_ON_READ")) {
                                        builder.add(Arguments.of(new Object[]{str, bool6, false, bool, bool4, bool5, bool3, bool2}));
                                    }
                                }
                            }
                            builder.add(Arguments.of(new Object[]{"MERGE_ON_READ", false, true, bool, bool4, bool5, bool3, bool2}));
                        }
                    }
                }
            }
        }
        return builder.build();
    }

    protected static Stream<Arguments> testReorderedColumn() {
        Stream.Builder builder = Stream.builder();
        for (Boolean bool : new Boolean[]{true}) {
            for (Boolean bool2 : new Boolean[]{false, true}) {
                for (Boolean bool3 : new Boolean[]{false, true}) {
                    for (String str : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                        builder.add(Arguments.of(new Object[]{str, bool, bool3, bool2}));
                    }
                }
            }
        }
        return builder.build();
    }

    protected static Stream<Arguments> testParamsWithSchemaTransformer() {
        Stream.Builder builder = Stream.builder();
        for (Boolean bool : new Boolean[]{false, true}) {
            for (Boolean bool2 : new Boolean[]{false, true}) {
                for (Boolean bool3 : new Boolean[]{true}) {
                    for (Boolean bool4 : new Boolean[]{false, true}) {
                        for (Boolean bool5 : new Boolean[]{false, true}) {
                            for (String str : new String[]{"COPY_ON_WRITE", "MERGE_ON_READ"}) {
                                builder.add(Arguments.of(new Object[]{str, bool3, bool5, bool4, bool, bool2}));
                            }
                        }
                    }
                }
            }
        }
        return builder.build();
    }

    @MethodSource({"testArgs"})
    @ParameterizedTest
    public void testBase(String str, Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, Boolean bool5, Boolean bool6, Boolean bool7) throws Exception {
        this.tableType = str;
        this.shouldCluster = bool;
        this.shouldCompact = bool2;
        this.rowWriterEnable = bool3;
        this.addFilegroups = bool4;
        this.multiLogFiles = bool5;
        this.useKafkaSource = bool6.booleanValue();
        if (bool6.booleanValue()) {
            this.useSchemaProvider = true;
        }
        this.useTransformer = true;
        boolean equals = str.equals("COPY_ON_WRITE");
        StringBuilder append = new StringBuilder().append(basePath).append("parquetFilesDfs");
        int i = testNum + 1;
        testNum = i;
        PARQUET_SOURCE_ROOT = append.append(i).toString();
        this.tableBasePath = basePath + "test_parquet_table" + testNum;
        this.deltaStreamer = new HoodieDeltaStreamer(getDeltaStreamerConfig(bool7.booleanValue()), jsc);
        addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath()), true);
        this.deltaStreamer.sync();
        int i2 = 6;
        int i3 = 3;
        assertRecordCount(6);
        assertFileNumber(3, equals);
        if (bool5.booleanValue()) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            assertRecordCount(6);
            assertFileNumber(3, false);
        }
        if (bool4.booleanValue()) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/newFileGroupsTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            i2 = 6 + 3;
            i3 = 3 + 3;
            assertRecordCount(i2);
            assertFileNumber(i3, equals);
        }
        Dataset json = sparkSession.read().json(String.class.getResource("/data/schema-evolution/endTestEverything.json").getPath());
        Dataset withColumn = json.withColumn("tip_history", json.col("tip_history").cast(DataTypes.createArrayType(DataTypes.LongType)));
        Dataset withColumn2 = withColumn.withColumn("fare", withColumn.col("fare").cast(DataTypes.createStructType(new StructField[]{new StructField("amount", DataTypes.StringType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty()), new StructField("zextra_col_nested", DataTypes.StringType, true, Metadata.empty())})));
        Dataset withColumn3 = withColumn2.withColumn("begin_lat", withColumn2.col("begin_lat").cast(DataTypes.DoubleType));
        Dataset withColumn4 = withColumn3.withColumn("end_lat", withColumn3.col("end_lat").cast(DataTypes.StringType));
        Dataset withColumn5 = withColumn4.withColumn("distance_in_meters", withColumn4.col("distance_in_meters").cast(DataTypes.FloatType));
        try {
            addData(withColumn5.withColumn("seconds_since_epoch", withColumn5.col("seconds_since_epoch").cast(DataTypes.StringType)), false);
            this.deltaStreamer.sync();
            Assertions.assertTrue(bool7.booleanValue());
            if (bool.booleanValue()) {
                assertBaseFileOnlyNumber(3);
            } else if (bool2.booleanValue() || equals) {
                assertBaseFileOnlyNumber(i3);
            } else {
                assertFileNumber(i3 + 2, false);
            }
            assertRecordCount(i2);
            Dataset<Row> load = sparkSession.read().format("hudi").load(this.tableBasePath);
            load.show(100, false);
            load.cache();
            assertDataType(load, "tip_history", DataTypes.createArrayType(DataTypes.LongType));
            assertDataType(load, "fare", DataTypes.createStructType(new StructField[]{new StructField("amount", DataTypes.StringType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty()), new StructField("extra_col_struct", DataTypes.LongType, true, Metadata.empty()), new StructField("zextra_col_nested", DataTypes.StringType, true, Metadata.empty())}));
            assertDataType(load, "begin_lat", DataTypes.DoubleType);
            assertDataType(load, "end_lat", DataTypes.StringType);
            assertDataType(load, "distance_in_meters", DataTypes.FloatType);
            assertDataType(load, "seconds_since_epoch", DataTypes.StringType);
            assertCondition(load, "zextra_col = 'yes'", 2);
            assertCondition(load, "_extra_col = 'yes'", 2);
            assertCondition(load, "fare.zextra_col_nested = 'yes'", 2);
            assertCondition(load, "size(zcomplex_array) > 0", 2);
            assertCondition(load, "extra_col_regular is NULL", 2);
            assertCondition(load, "fare.extra_col_struct is NULL", 2);
        } catch (SchemaCompatibilityException e) {
            Assertions.assertTrue(e.getMessage().contains("Incoming batch schema is not compatible with the table's one"));
            Assertions.assertFalse(bool7.booleanValue());
        }
    }

    @MethodSource({"testReorderedColumn"})
    @ParameterizedTest
    public void testReorderingColumn(String str, Boolean bool, Boolean bool2, Boolean bool3) throws Exception {
        this.tableType = str;
        this.rowWriterEnable = bool;
        this.useKafkaSource = bool2.booleanValue();
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = true;
        if (bool2.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean equals = str.equals("COPY_ON_WRITE");
        StringBuilder append = new StringBuilder().append(basePath).append("parquetFilesDfs");
        int i = testNum + 1;
        testNum = i;
        PARQUET_SOURCE_ROOT = append.append(i).toString();
        this.tableBasePath = basePath + "test_parquet_table" + testNum;
        Dataset<Row> json = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        resetTopicAndDeltaStreamer(bool3);
        addData(json, true);
        this.deltaStreamer.sync();
        assertRecordCount(6);
        assertFileNumber(3, equals);
        if (str.equals("MERGE_ON_READ")) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            assertRecordCount(6);
            assertFileNumber(3, false);
        }
        assertRecordCount(6);
        resetTopicAndDeltaStreamer(bool3);
        HoodieStreamer.Config config = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = getMetaClient(config);
        HoodieInstant hoodieInstant = (HoodieInstant) metaClient.getActiveTimeline().lastInstant().get();
        addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath()).drop("rider").withColumn("rider", functions.lit("rider-003")), false);
        this.deltaStreamer.sync();
        metaClient.reloadActiveTimeline();
        Assertions.assertTrue(((Schema) UtilHelpers.getLatestTableSchema(jsc, fs, config.targetBasePath, metaClient).get()).getField("rider").schema().getTypes().stream().anyMatch(schema -> {
            return schema.getType().equals(Schema.Type.STRING);
        }));
        Assertions.assertTrue(((HoodieInstant) metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(hoodieInstant) > 0);
    }

    @MethodSource({"testParamsWithSchemaTransformer"})
    @ParameterizedTest
    public void testDroppedColumn(String str, Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, Boolean bool5) throws Exception {
        this.tableType = str;
        this.rowWriterEnable = bool;
        this.useKafkaSource = bool2.booleanValue();
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = bool4.booleanValue();
        if (bool2.booleanValue() || bool5.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean equals = str.equals("COPY_ON_WRITE");
        StringBuilder append = new StringBuilder().append(basePath).append("parquetFilesDfs");
        int i = testNum + 1;
        testNum = i;
        PARQUET_SOURCE_ROOT = append.append(i).toString();
        this.tableBasePath = basePath + "test_parquet_table" + testNum;
        Dataset<Row> json = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        resetTopicAndDeltaStreamer(bool3);
        addData(json, true);
        this.deltaStreamer.sync();
        assertRecordCount(6);
        assertFileNumber(3, equals);
        if (str.equals("MERGE_ON_READ")) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            assertRecordCount(6);
            assertFileNumber(3, false);
        }
        if (bool5.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        resetTopicAndDeltaStreamer(bool3);
        HoodieStreamer.Config config = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = getMetaClient(config);
        HoodieInstant hoodieInstant = (HoodieInstant) metaClient.getActiveTimeline().lastInstant().get();
        try {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath()).drop("rider"), true);
            this.deltaStreamer.sync();
            Assertions.assertTrue(bool3.booleanValue() || bool5.booleanValue());
            metaClient.reloadActiveTimeline();
            Assertions.assertTrue(((Schema) UtilHelpers.getLatestTableSchema(jsc, fs, config.targetBasePath, metaClient).get()).getField("rider").schema().getTypes().stream().anyMatch(schema -> {
                return schema.getType().equals(Schema.Type.STRING);
            }));
            Assertions.assertTrue(((HoodieInstant) metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(hoodieInstant) > 0);
        } catch (SchemaCompatibilityException e) {
            Assertions.assertFalse(bool3.booleanValue() || bool5.booleanValue());
            Assertions.assertTrue(e.getMessage().contains("Incoming batch schema is not compatible with the table's one"));
            Assertions.assertFalse(bool3.booleanValue());
        }
    }

    @MethodSource({"testParamsWithSchemaTransformer"})
    @ParameterizedTest
    public void testTypePromotion(String str, Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, Boolean bool5) throws Exception {
        this.tableType = str;
        this.rowWriterEnable = bool;
        this.useKafkaSource = bool2.booleanValue();
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = bool4.booleanValue();
        if (bool2.booleanValue() || bool5.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean equals = str.equals("COPY_ON_WRITE");
        StringBuilder append = new StringBuilder().append(basePath).append("parquetFilesDfs");
        int i = testNum + 1;
        testNum = i;
        PARQUET_SOURCE_ROOT = append.append(i).toString();
        this.tableBasePath = basePath + "test_parquet_table" + testNum;
        Dataset<Row> json = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        resetTopicAndDeltaStreamer(bool3);
        addData(json, true);
        this.deltaStreamer.sync();
        assertRecordCount(6);
        assertFileNumber(3, equals);
        if (str.equals("MERGE_ON_READ")) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            assertRecordCount(6);
            assertFileNumber(3, false);
        }
        if (bool5.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        resetTopicAndDeltaStreamer(bool3);
        HoodieStreamer.Config config = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = getMetaClient(config);
        HoodieInstant hoodieInstant = (HoodieInstant) metaClient.getActiveTimeline().lastInstant().get();
        Dataset json2 = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        try {
            addData(json2.withColumn("distance_in_meters", json2.col("distance_in_meters").cast(DataTypes.DoubleType)), true);
            this.deltaStreamer.sync();
            Assertions.assertFalse(bool5.booleanValue());
            metaClient.reloadActiveTimeline();
            Option latestTableSchema = UtilHelpers.getLatestTableSchema(jsc, fs, config.targetBasePath, metaClient);
            Assertions.assertTrue(((Schema) latestTableSchema.get()).getField("distance_in_meters").schema().getTypes().stream().anyMatch(schema -> {
                return schema.getType().equals(Schema.Type.DOUBLE);
            }), ((Schema) latestTableSchema.get()).getField("distance_in_meters").schema().toString());
            Assertions.assertTrue(((HoodieInstant) metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(hoodieInstant) > 0);
        } catch (Exception e) {
            Assertions.assertTrue(bool5.booleanValue());
            if (bool2.booleanValue()) {
                Assertions.assertTrue(containsErrorMessage(e, "Incoming batch schema is not compatible with the table's one", "cannot support rewrite value for schema type: \"long\" since the old schema type is: \"double\""), e.getMessage());
            } else {
                Assertions.assertTrue(containsErrorMessage(e, "Incoming batch schema is not compatible with the table's one", "org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong", "cannot support rewrite value for schema type: \"long\" since the old schema type is: \"double\""), e.getMessage());
            }
        }
    }

    @MethodSource({"testParamsWithSchemaTransformer"})
    @ParameterizedTest
    public void testTypeDemotion(String str, Boolean bool, Boolean bool2, Boolean bool3, Boolean bool4, Boolean bool5) throws Exception {
        this.tableType = str;
        this.rowWriterEnable = bool;
        this.useKafkaSource = bool2.booleanValue();
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.useTransformer = bool4.booleanValue();
        if (bool2.booleanValue() || bool5.booleanValue()) {
            this.useSchemaProvider = true;
        }
        boolean equals = str.equals("COPY_ON_WRITE");
        StringBuilder append = new StringBuilder().append(basePath).append("parquetFilesDfs");
        int i = testNum + 1;
        testNum = i;
        PARQUET_SOURCE_ROOT = append.append(i).toString();
        this.tableBasePath = basePath + "test_parquet_table" + testNum;
        Dataset<Row> json = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        resetTopicAndDeltaStreamer(bool3);
        addData(json, true);
        this.deltaStreamer.sync();
        assertRecordCount(6);
        assertFileNumber(3, equals);
        if (str.equals("MERGE_ON_READ")) {
            addData(sparkSession.read().json(String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath()), false);
            this.deltaStreamer.sync();
            assertRecordCount(6);
            assertFileNumber(3, false);
        }
        if (bool5.booleanValue()) {
            TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.setTargetSchema(TestHoodieDeltaStreamerSchemaEvolutionBase.TestSchemaProvider.sourceSchema);
        }
        resetTopicAndDeltaStreamer(bool3);
        HoodieStreamer.Config config = this.deltaStreamer.getConfig();
        HoodieTableMetaClient metaClient = getMetaClient(config);
        HoodieInstant hoodieInstant = (HoodieInstant) metaClient.getActiveTimeline().lastInstant().get();
        Dataset json2 = sparkSession.read().json(String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath());
        addData(json2.withColumn("current_ts", json2.col("current_ts").cast(DataTypes.IntegerType)), true);
        this.deltaStreamer.sync();
        metaClient.reloadActiveTimeline();
        Assertions.assertTrue(((Schema) UtilHelpers.getLatestTableSchema(jsc, fs, config.targetBasePath, metaClient).get()).getField("current_ts").schema().getTypes().stream().anyMatch(schema -> {
            return schema.getType().equals(Schema.Type.LONG);
        }));
        Assertions.assertTrue(((HoodieInstant) metaClient.reloadActiveTimeline().lastInstant().get()).compareTo(hoodieInstant) > 0);
    }

    private static HoodieTableMetaClient getMetaClient(HoodieStreamer.Config config) {
        return HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(config.targetBasePath).setPayloadClassName(config.payloadClassName).build();
    }

    private void resetTopicAndDeltaStreamer(Boolean bool) throws IOException {
        StringBuilder append = new StringBuilder().append("topic");
        int i = testNum + 1;
        testNum = i;
        topicName = append.append(i).toString();
        if (this.deltaStreamer != null) {
            this.deltaStreamer.shutdownGracefully();
        }
        String[] strArr = this.useTransformer ? new String[]{TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()} : new String[0];
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.streamer.checkpoint.force.skip", "true");
        HoodieDeltaStreamer.Config deltaStreamerConfig = getDeltaStreamerConfig(strArr, bool.booleanValue(), typedProperties);
        deltaStreamerConfig.checkpoint = "0";
        this.deltaStreamer = new HoodieDeltaStreamer(deltaStreamerConfig, jsc);
    }

    private boolean containsErrorMessage(Throwable th, String... strArr) {
        while (th != null) {
            for (String str : strArr) {
                if (th.getMessage().contains(str)) {
                    return true;
                }
            }
            th = th.getCause();
        }
        return false;
    }

    protected void assertDataType(Dataset<Row> dataset, String str, DataType dataType) {
        Assertions.assertEquals(dataType, dataset.select(str, new String[0]).schema().fields()[0].dataType());
    }

    protected void assertCondition(Dataset<Row> dataset, String str, int i) {
        Assertions.assertEquals(i, dataset.filter(str).count());
    }
}
