package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.class */
public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer$TestHelpers.class */
    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String str, String str2, String str3, boolean z, boolean z2, Class<?> cls) {
            return getConfig(str, str2, str3, z, z2, true, "multi_table_dataset", cls);
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String str, String str2, String str3, boolean z, boolean z2, boolean z3, String str4, Class<?> cls) {
            HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
            config.configFolder = str2;
            config.targetTableName = "dummy_table";
            config.basePathPrefix = TestHoodieMultiTableDeltaStreamer.dfsBasePath + "/" + str4;
            config.propsFilePath = TestHoodieMultiTableDeltaStreamer.dfsBasePath + "/" + str;
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = str3;
            config.sourceOrderingField = "timestamp";
            if (z3) {
                config.schemaProviderClassName = cls != null ? cls.getName() : FilebasedSchemaProvider.class.getName();
            }
            config.enableHiveSync = Boolean.valueOf(z);
            config.enableMetaSync = Boolean.valueOf(z2);
            return config;
        }
    }

    @Test
    public void testInvalidHiveSyncProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig("test-invalid-hive-sync-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception exc = (Exception) Assertions.assertThrows(HoodieException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when hive sync table not provided with enableHiveSync flag");
        LOG.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Meta sync table field not provided!"));
    }

    @Test
    public void testInvalidPropsFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig("test-invalid-props.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception exc = (Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when invalid props file is provided");
        LOG.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid common config file path!"));
    }

    @Test
    public void testInvalidTableConfigFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig("test-invalid-table-config.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception exc = (Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            new HoodieMultiTableDeltaStreamer(config, this.jsc);
        }, "Should fail when invalid table config props file path is provided");
        LOG.debug("Expected error when creating table execution objects", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid table config file path!"));
    }

    @Test
    public void testCustomConfigProps() throws IOException {
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer = new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class), this.jsc);
        TableExecutionContext tableExecutionContext = (TableExecutionContext) hoodieMultiTableDeltaStreamer.getTableExecutionContexts().get(1);
        Assertions.assertEquals(2, hoodieMultiTableDeltaStreamer.getTableExecutionContexts().size());
        Assertions.assertEquals(dfsBasePath + "/multi_table_dataset/uber_db/dummy_table_uber", tableExecutionContext.getConfig().targetBasePath);
        Assertions.assertEquals("uber_db.dummy_table_uber", tableExecutionContext.getConfig().targetTableName);
        Assertions.assertEquals("topic1", tableExecutionContext.getProperties().getString("hoodie.deltastreamer.source.kafka.topic"));
        Assertions.assertEquals("_row_key", tableExecutionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
        Assertions.assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
        Assertions.assertEquals("uber_hive_dummy_table", tableExecutionContext.getProperties().getString("hoodie.datasource.hive_sync.table"));
        Assertions.assertEquals("http://localhost:8081/subjects/random-value/versions/latest", tableExecutionContext.getProperties().getString("hoodie.deltastreamer.schemaprovider.registry.url"));
        Assertions.assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest", ((TableExecutionContext) hoodieMultiTableDeltaStreamer.getTableExecutionContexts().get(0)).getProperties().getString("hoodie.deltastreamer.schemaprovider.registry.url"));
    }

    @Disabled
    @Test
    public void testInvalidIngestionProps() {
        Exception exc = (Exception) Assertions.assertThrows(Exception.class, () -> {
            new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null), this.jsc);
        }, "Creation of execution object should fail without kafka topic");
        LOG.debug("Creation of execution object failed with error: " + exc.getMessage(), exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide valid table config arguments!"));
    }

    @Test
    public void testMultiTableExecutionWithKafkaSource() throws IOException {
        StringBuilder append = new StringBuilder().append("topic");
        int i = testNum;
        testNum = i + 1;
        String sb = append.append(i).toString();
        String str = "topic" + testNum;
        testUtils.createTopic(sb, 2);
        testUtils.createTopic(str, 2);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages(sb, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 5, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", 10, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        HoodieMultiTableDeltaStreamer.Config config = TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false, null);
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer = new HoodieMultiTableDeltaStreamer(config, this.jsc);
        List tableExecutionContexts = hoodieMultiTableDeltaStreamer.getTableExecutionContexts();
        TypedProperties properties = ((TableExecutionContext) tableExecutionContexts.get(1)).getProperties();
        properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
        properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
        properties.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
        properties.setProperty("hoodie.deltastreamer.source.kafka.topic", str);
        ((TableExecutionContext) tableExecutionContexts.get(1)).setProperties(properties);
        TypedProperties properties2 = ((TableExecutionContext) tableExecutionContexts.get(0)).getProperties();
        properties2.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc");
        properties2.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc");
        properties2.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
        properties2.setProperty("hoodie.deltastreamer.source.kafka.topic", sb);
        ((TableExecutionContext) tableExecutionContexts.get(0)).setProperties(properties2);
        String str2 = ((TableExecutionContext) tableExecutionContexts.get(0)).getConfig().targetBasePath;
        String str3 = ((TableExecutionContext) tableExecutionContexts.get(1)).getConfig().targetBasePath;
        hoodieMultiTableDeltaStreamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, str2, this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, str3, this.sqlContext);
        testUtils.sendMessages(sb, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateUpdatesAsPerSchema("001", 5, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecords(hoodieTestDataGenerator.generateUpdatesAsPerSchema("001", 10, "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer2 = new HoodieMultiTableDeltaStreamer(config, this.jsc);
        ((TableExecutionContext) hoodieMultiTableDeltaStreamer2.getTableExecutionContexts().get(1)).setProperties(properties);
        ((TableExecutionContext) hoodieMultiTableDeltaStreamer2.getTableExecutionContexts().get(0)).setProperties(properties2);
        hoodieMultiTableDeltaStreamer2.sync();
        Assertions.assertEquals(2, hoodieMultiTableDeltaStreamer2.getSuccessTables().size());
        Assertions.assertTrue(hoodieMultiTableDeltaStreamer2.getFailedTables().isEmpty());
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, str2, this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, str3, this.sqlContext);
        testNum++;
    }

    @Test
    public void testMultiTableExecutionWithParquetSource() throws IOException {
        String str = dfsBasePath + "/parquetSrcPath1/";
        prepareParquetDFSFiles(10, str);
        String str2 = dfsBasePath + "/parquetSrcPath2/";
        prepareParquetDFSFiles(5, str2);
        HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer = new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig(populateCommonPropsAndWriteToFile(), dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false, false, "multi_table_parquet", null), this.jsc);
        List<TableExecutionContext> tableExecutionContexts = hoodieMultiTableDeltaStreamer.getTableExecutionContexts();
        ingestPerParquetSourceProps(tableExecutionContexts, Arrays.asList(str, str2));
        String str3 = tableExecutionContexts.get(0).getConfig().targetBasePath;
        String str4 = tableExecutionContexts.get(1).getConfig().targetBasePath;
        syncAndVerify(hoodieMultiTableDeltaStreamer, str3, str4, 10L, 5L);
        int i = 10;
        int i2 = 5;
        for (int i3 = 0; i3 < 3; i3++) {
            int nextInt = 10 + RANDOM.nextInt(100);
            int nextInt2 = 15 + RANDOM.nextInt(100);
            prepareParquetDFSFiles(nextInt, str, (i3 + 2) + ".parquet", false, null, null);
            prepareParquetDFSFiles(nextInt2, str2, (i3 + 2) + ".parquet", false, null, null);
            i += nextInt;
            i2 += nextInt2;
            syncAndVerify(hoodieMultiTableDeltaStreamer, str3, str4, i, i2);
        }
    }

    @Test
    public void testTableLevelProperties() throws IOException {
        new HoodieMultiTableDeltaStreamer(TestHelpers.getConfig("test-source1.properties", dfsBasePath + "/config", TestDataSource.class.getName(), false, false, null), this.jsc).getTableExecutionContexts().forEach(tableExecutionContext -> {
            String tableName = tableExecutionContext.getTableName();
            boolean z = -1;
            switch (tableName.hashCode()) {
                case 1464575440:
                    if (tableName.equals("dummy_table_short_trip")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case AbstractBaseTestSource.DEFAULT_PARTITION_NUM /* 0 */:
                    Assertions.assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
                    return;
                default:
                    Assertions.assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
                    return;
            }
        });
    }

    private String populateCommonPropsAndWriteToFile() throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        populateCommonProps(typedProperties, dfsBasePath);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/test-parquet-dfs-source.properties");
        return "test-parquet-dfs-source.properties";
    }

    private TypedProperties getParquetProps(String str) {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", str);
        return typedProperties;
    }

    private void ingestPerParquetSourceProps(List<TableExecutionContext> list, List<String> list2) {
        int i = 0;
        for (String str : list2) {
            TypedProperties properties = list.get(i).getProperties();
            getParquetProps(str).forEach((obj, obj2) -> {
                properties.setProperty(obj.toString(), obj2.toString());
            });
            list.get(i).setProperties(properties);
            i++;
        }
    }

    private void syncAndVerify(HoodieMultiTableDeltaStreamer hoodieMultiTableDeltaStreamer, String str, String str2, long j, long j2) {
        hoodieMultiTableDeltaStreamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(j, str, this.sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(j2, str2, this.sqlContext);
    }
}
