package org.apache.hudi.utilities.functional;

import java.util.Arrays;
import java.util.List;
import junit.framework.Assert;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.exception.HoodieTransformException;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.class */
public class TestErrorTableAwareChainedTransformer extends SparkClientFunctionalTestHarness {
    @Test
    public void testForErrorTableConfig() {
        Dataset<Row> testDataset = getTestDataset();
        ErrorTableAwareChainedTransformer errorTableAwareChainedTransformer = new ErrorTableAwareChainedTransformer(Arrays.asList(getErrorEventHandlerTransformer(), (javaSparkContext, sparkSession, dataset, typedProperties) -> {
            return dataset.withColumn("foo", dataset.col("foo").cast(DataTypes.IntegerType));
        }));
        TypedProperties typedProperties2 = new TypedProperties();
        typedProperties2.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), "true");
        Dataset apply = errorTableAwareChainedTransformer.apply(jsc(), spark(), testDataset, typedProperties2);
        List collectAsList = apply.collectAsList();
        Assertions.assertArrayEquals(new String[]{"foo", BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME}, apply.columns());
        Assert.assertEquals(2L, apply.filter(new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNotNull()).count());
        Assert.assertEquals(100, ((Row) collectAsList.get(0)).getInt(0));
        Assert.assertEquals(200, ((Row) collectAsList.get(1)).getInt(0));
    }

    @Test
    public void testForErrorRecordColumn() {
        Dataset<Row> testDataset = getTestDataset();
        Transformer errorEventHandlerTransformer = getErrorEventHandlerTransformer();
        Transformer errorRecordColumnDropTransformer = getErrorRecordColumnDropTransformer();
        Transformer transformer = (javaSparkContext, sparkSession, dataset, typedProperties) -> {
            return dataset.withColumn("foo", dataset.col("foo").cast(DataTypes.IntegerType));
        };
        TypedProperties typedProperties2 = new TypedProperties();
        typedProperties2.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), "true");
        ErrorTableAwareChainedTransformer errorTableAwareChainedTransformer = new ErrorTableAwareChainedTransformer(Arrays.asList(errorEventHandlerTransformer, errorRecordColumnDropTransformer, transformer));
        Assertions.assertThrows(HoodieValidationException.class, () -> {
            errorTableAwareChainedTransformer.apply(jsc(), spark(), testDataset, typedProperties2);
        });
    }

    private Dataset<Row> getTestDataset() {
        return spark().sqlContext().createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"100"}), RowFactory.create(new Object[]{"200"})), DataTypes.createStructType(new StructField[]{DataTypes.createStructField("foo", DataTypes.StringType, false)}));
    }

    private Transformer getErrorEventHandlerTransformer() {
        return (javaSparkContext, sparkSession, dataset, typedProperties) -> {
            if (typedProperties.getBoolean(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), ((Boolean) HoodieErrorTableConfig.ERROR_TABLE_ENABLED.defaultValue()).booleanValue())) {
                dataset = dataset.withColumn(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME, functions.when(new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNull(), functions.lit("true")).otherwise(new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME)));
            }
            return dataset;
        };
    }

    private Transformer getErrorRecordColumnDropTransformer() {
        return (javaSparkContext, sparkSession, dataset, typedProperties) -> {
            return dataset.select("foo", new String[0]);
        };
    }

    @ValueSource(strings = {":org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer", "T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer", "T1::org.apache.hudi.utilities.transform.FlatteningTransformer", "org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer"})
    @ParameterizedTest
    public void testErrorTableAwareChainedTransformerValidationFails(String str) {
        Assertions.assertThrows(HoodieTransformException.class, () -> {
            new ErrorTableAwareChainedTransformer(Arrays.asList(str.split(",")), Option::empty);
        });
    }

    @ValueSource(strings = {"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer", "T2:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer", "abc:org.apache.hudi.utilities.transform.FlatteningTransformer,def:org.apache.hudi.utilities.transform.FlatteningTransformer", "org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.FlatteningTransformer"})
    @ParameterizedTest
    public void testErrorTableAwareChainedTransformerValidationPasses(String str) {
        Assert.assertNotNull(new ErrorTableAwareChainedTransformer(Arrays.asList(str.split(",")), Option::empty));
    }
}
