package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.class */
public class TestHoodieDeltaStreamerSchemaEvolutionBase extends HoodieDeltaStreamerTestBase {
    protected static Set<String> createdTopicNames = new HashSet();
    protected String tableType;
    protected String tableBasePath;
    protected String tableName;
    protected Boolean shouldCluster;
    protected Boolean shouldCompact;
    protected Boolean rowWriterEnable;
    protected Boolean addFilegroups;
    protected Boolean multiLogFiles;
    protected Boolean useSchemaProvider;
    protected Boolean hasTransformer;
    protected Boolean useParquetLogBlock;
    protected String sourceSchemaFile;
    protected String targetSchemaFile;
    protected boolean useKafkaSource;
    protected boolean withErrorTable;
    protected boolean useTransformer;
    protected boolean userProvidedSchema;
    protected HoodieStreamer deltaStreamer;

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase$TestErrorTable.class */
    public static class TestErrorTable extends BaseErrorTableWriter {
        public static List<JavaRDD> errorEvents = new ArrayList();
        public static Map<String, Option<JavaRDD>> commited = new HashMap();

        public TestErrorTable(HoodieStreamer.Config config, SparkSession sparkSession, TypedProperties typedProperties, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem) {
            super(config, sparkSession, typedProperties, hoodieSparkEngineContext, fileSystem);
        }

        public void addErrorEvents(JavaRDD javaRDD) {
            errorEvents.add(javaRDD);
        }

        public boolean upsertAndCommit(String str, Option option) {
            if (errorEvents.size() <= 0) {
                commited.put(str, Option.empty());
                return true;
            }
            JavaRDD javaRDD = errorEvents.get(0);
            for (int i = 1; i < errorEvents.size(); i++) {
                javaRDD = javaRDD.union(errorEvents.get(i));
            }
            commited.put(str, Option.of(javaRDD));
            errorEvents = new ArrayList();
            return true;
        }

        public Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String str, Option option) {
            return Option.empty();
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase$TestErrorTableV1.class */
    public static class TestErrorTableV1 extends TestErrorTable {
        public TestErrorTableV1(HoodieStreamer.Config config, SparkSession sparkSession, TypedProperties typedProperties, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem, Option<HoodieIngestionMetrics> option) {
            super(config, sparkSession, typedProperties, hoodieSparkEngineContext, fileSystem);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase$TestSchemaProvider.class */
    public static class TestSchemaProvider extends SchemaProvider {
        public static Schema sourceSchema;
        public static Schema targetSchema = null;

        public TestSchemaProvider(TypedProperties typedProperties, JavaSparkContext javaSparkContext) {
            super(typedProperties, javaSparkContext);
        }

        public Schema getSourceSchema() {
            return sourceSchema;
        }

        public Schema getTargetSchema() {
            return targetSchema != null ? targetSchema : sourceSchema;
        }

        public static void setTargetSchema(Schema schema) {
            targetSchema = schema;
        }

        public static void resetTargetSchema() {
            targetSchema = null;
        }
    }

    @BeforeAll
    public static void initKafka() {
        defaultSchemaProviderClassName = TestSchemaProvider.class.getName();
    }

    @Override // org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase
    @BeforeEach
    public void setupTest() {
        super.setupTest();
        TestErrorTable.commited = new HashMap();
        TestErrorTable.errorEvents = new ArrayList();
        this.useSchemaProvider = false;
        this.hasTransformer = false;
        this.withErrorTable = false;
        this.useParquetLogBlock = false;
        this.sourceSchemaFile = "";
        this.targetSchemaFile = "";
        topicName = "topic" + testNum;
        if (HoodieSparkUtils.gteqSpark3_3()) {
            sparkSession.conf().set("spark.sql.parquet.enableNestedColumnVectorizedReader", "false");
        }
    }

    @Override // org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        TestSchemaProvider.resetTargetSchema();
    }

    @AfterAll
    static void teardownAll() {
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieDeltaStreamer.Config getDeltaStreamerConfig() throws IOException {
        return getDeltaStreamerConfig(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieDeltaStreamer.Config getDeltaStreamerConfig(boolean z) throws IOException {
        return getDeltaStreamerConfig(this.useTransformer ? new String[]{TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()} : new String[0], z);
    }

    protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] strArr, boolean z) throws IOException {
        return getDeltaStreamerConfig(strArr, z, new TypedProperties());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] strArr, boolean z, TypedProperties typedProperties) throws IOException {
        HoodieDeltaStreamer.Config makeConfig;
        typedProperties.setProperty("hoodie.datasource.write.table.type", this.tableType);
        typedProperties.setProperty("hoodie.datasource.write.row.writer.enable", this.rowWriterEnable.toString());
        typedProperties.setProperty(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS().key(), Boolean.toString(z));
        typedProperties.setProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), this.useParquetLogBlock.booleanValue() ? "parquet" : "avro");
        typedProperties.setProperty("hoodie.parquet.small.file.limit", "0");
        int i = 2;
        if (this.addFilegroups.booleanValue()) {
            i = 2 + 1;
        }
        if (this.multiLogFiles.booleanValue()) {
            i++;
        }
        typedProperties.setProperty(HoodieCompactionConfig.INLINE_COMPACT.key(), this.shouldCompact.toString());
        if (this.shouldCompact.booleanValue()) {
            typedProperties.setProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), Integer.toString(i));
        }
        if (this.shouldCluster.booleanValue()) {
            typedProperties.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
            typedProperties.setProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), Integer.toString(i));
            typedProperties.setProperty(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), "_row_key");
        }
        if (this.withErrorTable) {
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_TABLE_ENABLED.key(), "true");
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(), "true");
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), "true");
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_TARGET_TABLE.key(), this.tableName + "ERROR");
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_TABLE_BASE_PATH.key(), basePath + this.tableName + "ERROR");
            typedProperties.setProperty(HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS.key(), TestErrorTable.class.getName());
            typedProperties.setProperty("hoodie.base.path", this.tableBasePath);
        }
        ArrayList arrayList = new ArrayList();
        Collections.addAll(arrayList, strArr);
        if (this.useKafkaSource) {
            prepareAvroKafkaDFSSource("test-avro-kafka-dfs-source.properties", null, topicName, "partition_path", typedProperties);
            makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(this.tableBasePath, WriteOperationType.UPSERT, AvroKafkaSource.class.getName(), arrayList, "test-avro-kafka-dfs-source.properties", false, this.useSchemaProvider.booleanValue(), 100000, false, null, this.tableType, "timestamp", null);
        } else {
            prepareParquetDFSSource(false, this.hasTransformer.booleanValue(), this.sourceSchemaFile, this.targetSchemaFile, "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "", typedProperties, false);
            makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(this.tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), arrayList, "test-parquet-dfs-source.properties", false, this.useSchemaProvider.booleanValue(), 100000, false, null, this.tableType, "timestamp", null);
        }
        makeConfig.forceDisableCompaction = Boolean.valueOf(!this.shouldCompact.booleanValue());
        return makeConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addData(Dataset<Row> dataset, Boolean bool) {
        if (this.useSchemaProvider.booleanValue()) {
            TestSchemaProvider.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataset.schema(), "hoodie_source", "hoodie.source");
            if (this.withErrorTable && bool.booleanValue()) {
                TestSchemaProvider.setTargetSchema(AvroConversionUtils.convertStructTypeToAvroSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(dataset.schema(), "_row_key"), "idk", "idk"));
            }
        }
        if (this.useKafkaSource) {
            addKafkaData(dataset, bool);
        } else {
            addParquetData(dataset, bool);
        }
    }

    protected void addParquetData(Dataset<Row> dataset, Boolean bool) {
        dataset.write().format("parquet").mode(bool.booleanValue() ? SaveMode.Overwrite : SaveMode.Append).save(PARQUET_SOURCE_ROOT);
    }

    protected void addKafkaData(Dataset<Row> dataset, Boolean bool) {
        if (bool.booleanValue() && !createdTopicNames.contains(topicName)) {
            this.testUtils.createTopic(topicName);
            createdTopicNames.add(topicName);
        }
        List collect = HoodieSparkUtils.createRdd(dataset, "hoodie_source", "hoodie.source", false, Option.empty()).toJavaRDD().collect();
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());
        Throwable th = null;
        try {
            try {
                Iterator it = collect.iterator();
                while (it.hasNext()) {
                    kafkaProducer.send(new ProducerRecord(topicName, 0, "key", HoodieAvroUtils.avroToBytes((GenericRecord) it.next())));
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    protected Properties getProducerProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.testUtils.brokerAddress());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        properties.put("value.deserializer", ByteArraySerializer.class.getName());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("auto.register.schemas", "false");
        properties.put("acks", "all");
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertFileNumber(int i, boolean z) {
        if (z) {
            assertBaseFileOnlyNumber(i);
        } else {
            Assertions.assertEquals(i, sparkSession.read().format("hudi").load(this.tableBasePath).select("_hoodie_commit_time", new String[]{"_hoodie_file_name"}).distinct().count());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertBaseFileOnlyNumber(int i) {
        Dataset select = sparkSession.read().format("hudi").load(this.tableBasePath).select("_hoodie_file_name", new String[0]);
        select.createOrReplaceTempView("assertFileNumberPostCompactCluster");
        Assertions.assertEquals(select.count(), sparkSession.sql("select * from assertFileNumberPostCompactCluster where _hoodie_file_name like '%.parquet'").count());
        Assertions.assertEquals(i, select.distinct().count());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertRecordCount(int i) {
        sqlContext.clearCache();
        Assertions.assertEquals(i, sqlContext.read().format("org.apache.hudi").load(this.tableBasePath).count());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StructType createFareStruct(DataType dataType) {
        return createFareStruct(dataType, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StructType createFareStruct(DataType dataType, Boolean bool) {
        return bool.booleanValue() ? DataTypes.createStructType(new StructField[]{new StructField("amount", dataType, true, Metadata.empty())}) : DataTypes.createStructType(new StructField[]{new StructField("amount", dataType, true, Metadata.empty()), new StructField("currency", DataTypes.StringType, true, Metadata.empty())});
    }
}
