package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.DFSPropertiesConfiguration;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.UtilitiesTestBase;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer.class */
public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
    private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
    private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
    private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
    private static final int PARQUET_NUM_RECORDS = 5;
    private static final Random RANDOM = new Random();
    private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
    private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
    private static int parquetTestNum = 1;

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$DistanceUDF.class */
    public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double d, Double d2, Double d3, Double d4) {
            return Double.valueOf(TestHoodieDeltaStreamer.RANDOM.nextDouble());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$DropAllTransformer.class */
    public static class DropAllTransformer implements Transformer {
        public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(javaSparkContext.emptyRDD(), dataset.schema());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$DummyAvroPayload.class */
    public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord genericRecord, Comparable comparable) {
            super(genericRecord, comparable);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$DummySchemaProvider.class */
    public static class DummySchemaProvider extends SchemaProvider {
        public DummySchemaProvider(TypedProperties typedProperties, JavaSparkContext javaSparkContext) {
            super(typedProperties, javaSparkContext);
        }

        public Schema getSourceSchema() {
            return Schema.create(Schema.Type.NULL);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$TestGenerator.class */
    public static class TestGenerator extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$TestHelpers.class */
    public static class TestHelpers {
        TestHelpers() {
        }

        static HoodieDeltaStreamer.Config makeDropAllConfig(String str, HoodieDeltaStreamer.Operation operation) {
            return makeConfig(str, operation, DropAllTransformer.class.getName());
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, HoodieDeltaStreamer.Operation operation) {
            return makeConfig(str, operation, TripsWithDistanceTransformer.class.getName());
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, HoodieDeltaStreamer.Operation operation, String str2) {
            return makeConfig(str, operation, str2, TestHoodieDeltaStreamer.PROPS_FILENAME_TEST_SOURCE, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, HoodieDeltaStreamer.Operation operation, String str2, String str3, boolean z) {
            return makeConfig(str, operation, str2, str3, z, true, false, null, null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, HoodieDeltaStreamer.Operation operation, String str2, String str3, boolean z, boolean z2, boolean z3, String str4, String str5) {
            return makeConfig(str, operation, TestDataSource.class.getName(), str2, str3, z, z2, 1000, z3, str4, str5);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, HoodieDeltaStreamer.Operation operation, String str2, String str3, String str4, boolean z, boolean z2, int i, boolean z3, String str5, String str6) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str;
            config.targetTableName = "hoodie_trips";
            config.tableType = str6 == null ? "COPY_ON_WRITE" : str6;
            config.sourceClassName = str2;
            config.transformerClassName = str3;
            config.operation = operation;
            config.enableHiveSync = Boolean.valueOf(z);
            config.sourceOrderingField = "timestamp";
            config.propsFilePath = UtilitiesTestBase.dfsBasePath + "/" + str4;
            config.sourceLimit = i;
            if (z3) {
                config.payloadClassName = str5;
            }
            if (z2) {
                config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
            }
            return config;
        }

        static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String str, String str2, HoodieDeltaStreamer.Operation operation, boolean z, String str3) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str2;
            config.targetTableName = "hoodie_trips_copy";
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = HoodieIncrSource.class.getName();
            config.operation = operation;
            config.sourceOrderingField = "timestamp";
            config.propsFilePath = UtilitiesTestBase.dfsBasePath + "/test-downstream-source.properties";
            config.sourceLimit = 1000L;
            if (null != str3) {
                config.schemaProviderClassName = str3;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + z);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.path=" + str);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
            config.configs = arrayList;
            return config;
        }

        static void assertRecordCount(long j, String str, SQLContext sQLContext) {
            Assert.assertEquals(j, sQLContext.read().format("org.apache.hudi").load(str).count());
        }

        static List<Row> countsPerCommit(String str, SQLContext sQLContext) {
            return sQLContext.read().format("org.apache.hudi").load(str).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
        }

        static void assertDistanceCount(long j, String str, SQLContext sQLContext) {
            sQLContext.read().format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
            Assert.assertEquals(j, sQLContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count());
        }

        static void assertDistanceCountWithExactValue(long j, String str, SQLContext sQLContext) {
            sQLContext.read().format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
            Assert.assertEquals(j, sQLContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count());
        }

        static void assertAtleastNCompactionCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(fileSystem.getConf(), str);
            HoodieTimeline filterCompletedInstants = hoodieTableMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + hoodieTableMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assert.assertTrue("Got=" + count + ", exp >=" + i, i <= count);
        }

        static void assertAtleastNDeltaCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(fileSystem.getConf(), str);
            HoodieTimeline filterCompletedInstants = hoodieTableMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + hoodieTableMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assert.assertTrue("Got=" + count + ", exp >=" + i, i <= count);
        }

        static String assertCommitMetadata(String str, String str2, FileSystem fileSystem, int i) throws IOException {
            HoodieTimeline filterCompletedInstants = new HoodieTableMetaClient(fileSystem.getConf(), str2).getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant hoodieInstant = (HoodieInstant) filterCompletedInstants.lastInstant().get();
            HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
            Assert.assertEquals(i, filterCompletedInstants.countInstants());
            Assert.assertEquals(str, hoodieCommitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
            return hoodieInstant.getTimestamp();
        }

        static void waitTillCondition(Function<Boolean, Boolean> function, long j) throws Exception {
            Executors.newSingleThreadExecutor().submit(() -> {
                boolean z = false;
                while (!z) {
                    try {
                        Thread.sleep(3000L);
                        z = ((Boolean) function.apply(true)).booleanValue();
                    } catch (Throwable th) {
                        TestHoodieDeltaStreamer.LOG.warn("Got error :", th);
                        z = false;
                    }
                }
                return true;
            }).get(j, TimeUnit.SECONDS);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/TestHoodieDeltaStreamer$TripsWithDistanceTransformer.class */
    public static class TripsWithDistanceTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            dataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
            return dataset.withColumn("haversine_distance", functions.callUDF("distance_udf", new Column[]{functions.col("begin_lat"), functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")}));
        }
    }

    @BeforeClass
    public static void initClass() throws Exception {
        UtilitiesTestBase.initClass(true);
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, dfsBasePath + "/sql-transformer.properties");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "sql-transformer.properties");
        typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        typedProperties.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
        typedProperties.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
        typedProperties.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hive_trips");
        typedProperties.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "datestr");
        typedProperties.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName());
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
        TypedProperties typedProperties2 = new TypedProperties();
        typedProperties2.setProperty("include", "base.properties");
        typedProperties2.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties2.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
        typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties2, dfs, dfsBasePath + "/test-downstream-source.properties");
        TypedProperties typedProperties3 = new TypedProperties();
        typedProperties3.setProperty("include", "sql-transformer.properties");
        typedProperties3.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
        typedProperties3.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties3.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        typedProperties3.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
        typedProperties3.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties3, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
        prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
    }

    @AfterClass
    public static void cleanupClass() throws Exception {
        UtilitiesTestBase.cleanupClass();
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @Before
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.hudi.utilities.UtilitiesTestBase
    @After
    public void teardown() throws Exception {
        super.teardown();
    }

    @Test
    public void testProps() {
        TypedProperties config = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
        Assert.assertEquals(2L, config.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assert.assertEquals("_row_key", config.getString("hoodie.datasource.write.recordkey.field"));
        Assert.assertEquals("org.apache.hudi.utilities.TestHoodieDeltaStreamer$TestGenerator", config.getString("hoodie.datasource.write.keygenerator.class"));
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() throws Exception {
        try {
            new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/test_table", HoodieDeltaStreamer.Operation.BULK_INSERT, TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), this.jsc).sync();
            Assert.fail("Should error out when setting the key generator class property to an invalid value");
        } catch (IOException e) {
            LOG.error("Expected error during getting the key generator", e);
            Assert.assertTrue(e.getMessage().contains("Could not load key generator class"));
        }
    }

    @Test
    public void testTableCreation() throws Exception {
        try {
            dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
            new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", HoodieDeltaStreamer.Operation.BULK_INSERT), this.jsc).sync();
            Assert.fail("Should error out when pointed out at a dir thats not a table");
        } catch (TableNotFoundException e) {
            LOG.error("Expected error during table creation", e);
        }
    }

    @Test
    public void testBulkInsertsAndUpserts() throws Exception {
        String str = dfsBasePath + "/test_table";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = HoodieDeltaStreamer.Operation.UPSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1950L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        Assert.assertEquals(1950L, TestHelpers.countsPerCommit(str + "/*/*.parquet", this.sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
    }

    @Test
    public void testUpsertsCOWContinuousMode() throws Exception {
        testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
    }

    @Test
    public void testUpsertsMORContinuousMode() throws Exception {
        testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
    }

    private void testUpsertsContinuousMode(HoodieTableType hoodieTableType, String str) throws Exception {
        String str2 = dfsBasePath + "/" + str;
        int i = 3000;
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str2, HoodieDeltaStreamer.Operation.UPSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = hoodieTableType.name();
        makeConfig.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, 3000));
        makeConfig.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(makeConfig, this.jsc);
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            try {
                hoodieDeltaStreamer.sync();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
        TestHelpers.waitTillCondition(bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                TestHelpers.assertAtleastNDeltaCommits(PARQUET_NUM_RECORDS, str2, dfs);
                TestHelpers.assertAtleastNCompactionCommits(2, str2, dfs);
            } else {
                TestHelpers.assertAtleastNCompactionCommits(PARQUET_NUM_RECORDS, str2, dfs);
            }
            TestHelpers.assertRecordCount(i + 200, str2 + "/*/*.parquet", this.sqlContext);
            TestHelpers.assertDistanceCount(i + 200, str2 + "/*/*.parquet", this.sqlContext);
            return true;
        }, 180L);
        hoodieDeltaStreamer.shutdownGracefully();
        submit.get();
    }

    @Test
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
        String str = dfsBasePath + "/test_table2";
        String str2 = dfsBasePath + "/test_downstream_table2";
        HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(str, "hive_trips");
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str + "/*/*.parquet", this.sqlContext);
        String assertCommitMetadata = TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        new HoodieDeltaStreamer(TestHelpers.makeConfigForHudiIncrSrc(str, str2, HoodieDeltaStreamer.Operation.BULK_INSERT, true, null), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, dfs, 1);
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        new HoodieDeltaStreamer(TestHelpers.makeConfigForHudiIncrSrc(str, str2, HoodieDeltaStreamer.Operation.BULK_INSERT, true, DummySchemaProvider.class.getName()), this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, dfs, 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = HoodieDeltaStreamer.Operation.UPSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1950L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1950L, str + "/*/*.parquet", this.sqlContext);
        String assertCommitMetadata2 = TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        Assert.assertEquals(1950L, TestHelpers.countsPerCommit(str + "/*/*.parquet", this.sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc = TestHelpers.makeConfigForHudiIncrSrc(str, str2, HoodieDeltaStreamer.Operation.UPSERT, false, null);
        makeConfigForHudiIncrSrc.sourceLimit = 2000L;
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCount(2000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(2000L, str2 + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata2, str2, dfs, 2);
        Assert.assertEquals(2000L, TestHelpers.countsPerCommit(str2 + "/*/*.parquet", this.sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        HoodieHiveClient hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
        Assert.assertTrue("Table " + hiveSyncConfig.tableName + " should exist", hoodieHiveClient.doesTableExist(hiveSyncConfig.tableName));
        Assert.assertEquals("Table partitions should match the number of partitions we wrote", 1L, hoodieHiveClient.scanTablePartitions(hiveSyncConfig.tableName).size());
        Assert.assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", assertCommitMetadata2, hoodieHiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
    }

    @Test
    public void testNullSchemaProvider() throws Exception {
        try {
            new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/test_table", HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, false, false, null, null), this.jsc, dfs, hiveServer.getHiveConf()).sync();
            Assert.fail("Should error out when schema provider is not provided");
        } catch (HoodieException e) {
            LOG.error("Expected error during reading data from source ", e);
            Assert.assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
        }
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String str = dfsBasePath + "/test_dataset_mor";
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, "MERGE_ON_READ"), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, this.jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assert.assertEquals(properties.getProperty("hoodie.compaction.payload.class"), DummyAvroPayload.class.getName());
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String str = dfsBasePath + "/test_dataset_cow";
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, true, false, null, null), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT, SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, this.jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            properties.load((InputStream) open);
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    open.close();
                }
            }
            Assert.assertFalse(properties.containsKey("hoodie.compaction.payload.class"));
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFilterDupes() throws Exception {
        String str = dfsBasePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, HoodieDeltaStreamer.Operation.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.filterDupes = true;
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = HoodieDeltaStreamer.Operation.UPSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, str + "/*/*.parquet", this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        List<Row> countsPerCommit = TestHelpers.countsPerCommit(str + "/*/*.parquet", this.sqlContext);
        Assert.assertEquals(1000L, countsPerCommit.get(0).getLong(1));
        Assert.assertEquals(1000L, countsPerCommit.get(1).getLong(1));
        HoodieInstant hoodieInstant = (HoodieInstant) new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), str, true).getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config makeDropAllConfig = TestHelpers.makeDropAllConfig(str, HoodieDeltaStreamer.Operation.UPSERT);
        makeDropAllConfig.filterDupes = true;
        makeDropAllConfig.sourceLimit = 2000L;
        makeDropAllConfig.operation = HoodieDeltaStreamer.Operation.UPSERT;
        makeDropAllConfig.configs.add(String.format("%s=false", "hoodie.clean.automatic"));
        new HoodieDeltaStreamer(makeDropAllConfig, this.jsc).sync();
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), str, true);
        HoodieInstant hoodieInstant2 = (HoodieInstant) hoodieTableMetaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assert.assertTrue(HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), hoodieInstant.getTimestamp(), HoodieTimeline.GREATER));
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTableMetaClient.getActiveTimeline().getInstantDetails(hoodieInstant2).get(), HoodieCommitMetadata.class);
        System.out.println("New Commit Metadata=" + hoodieCommitMetadata);
        Assert.assertTrue(hoodieCommitMetadata.getPartitionToWriteStats().isEmpty());
    }

    @Test
    public void testDistributedTestDataSource() {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
        typedProperties.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
        typedProperties.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
        InputBatch fetchNext = new DistributedTestDataSource(typedProperties, this.jsc, this.sparkSession, null).fetchNext(Option.empty(), 10000000L);
        ((JavaRDD) fetchNext.getBatch().get()).cache();
        Assert.assertEquals(1000L, ((JavaRDD) fetchNext.getBatch().get()).count());
    }

    private static void prepareParquetDFSFiles(int i) throws IOException {
        String str = PARQUET_SOURCE_ROOT + "/1.parquet";
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInserts("000", Integer.valueOf(i)), hoodieTestDataGenerator), new Path(str));
    }

    private void prepareParquetDFSSource(boolean z, boolean z2) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
            if (z2) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
    }

    private void testParquetDFSSource(boolean z, String str) throws Exception {
        prepareParquetDFSSource(z, str != null);
        String str2 = dfsBasePath + "/test_parquet_table" + parquetTestNum;
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str2, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(), str, PROPS_FILENAME_TEST_PARQUET, false, z, 100000, false, null, null), this.jsc).sync();
        TestHelpers.assertRecordCount(5L, str2 + "/*/*.parquet", this.sqlContext);
        parquetTestNum++;
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
    }
}
