package org.apache.hudi.utilities.deltastreamer;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkRecordMerger;
import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.class */
public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$DistanceUDF.class */
    public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double d, Double d2, Double d3, Double d4) {
            return Double.valueOf(HoodieDeltaStreamerTestBase.RANDOM.nextDouble());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$DropAllTransformer.class */
    public static class DropAllTransformer implements Transformer {
        public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(javaSparkContext.emptyRDD(), dataset.schema());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$DummyAvroPayload.class */
    public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord genericRecord, Comparable comparable) {
            super(genericRecord, comparable);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestFileBasedSchemaProviderNullTargetSchema.class */
    public static class TestFileBasedSchemaProviderNullTargetSchema extends FilebasedSchemaProvider {
        public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties typedProperties, JavaSparkContext javaSparkContext) {
            super(typedProperties, javaSparkContext);
        }

        public Schema getTargetSchema() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestGenerator.class */
    public static class TestGenerator extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestIdentityTransformer.class */
    public static class TestIdentityTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestSpecificPartitionTransformer.class */
    public static class TestSpecificPartitionTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset.filter("partition_path == '2016/03/15'");
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestStreamSync.class */
    class TestStreamSync extends DeltaSync {
        public TestStreamSync(HoodieDeltaStreamer.Config config, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties typedProperties, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration, Function<SparkRDDWriteClient, Boolean> function) throws IOException {
            super(config, sparkSession, schemaProvider, typedProperties, javaSparkContext, fileSystem, configuration, function);
        }

        public Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
            return super.getLatestCommitMetadataWithValidCheckpointInfo(hoodieTimeline);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TestTableLevelGenerator.class */
    public static class TestTableLevelGenerator extends SimpleKeyGenerator {
        public TestTableLevelGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TripsWithDistanceTransformer.class */
    public static class TripsWithDistanceTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            dataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
            return dataset.withColumn("haversine_distance", functions.callUDF("distance_udf", new Column[]{functions.col("begin_lat"), functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")}));
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer$TripsWithEvolvedOptionalFieldTransformer.class */
    public static class TripsWithEvolvedOptionalFieldTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset.withColumn("evoluted_optional_union_field", functions.col("rider"));
        }
    }

    private void addRecordMerger(HoodieRecord.HoodieRecordType hoodieRecordType, List<String> list) {
        if (hoodieRecordType == HoodieRecord.HoodieRecordType.SPARK) {
            HashMap hashMap = new HashMap();
            hashMap.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName());
            hashMap.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
            for (Map.Entry entry : hashMap.entrySet()) {
                list.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
            }
            this.hudiOpts.putAll(hashMap);
        }
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String str, int i, String str2, HoodieRecord.HoodieRecordType hoodieRecordType) throws IOException {
        return initialHoodieDeltaStreamer(str, i, str2, hoodieRecordType, WriteOperationType.INSERT);
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String str, int i, String str2, HoodieRecord.HoodieRecordType hoodieRecordType, WriteOperationType writeOperationType) throws IOException {
        return initialHoodieDeltaStreamer(str, i, str2, hoodieRecordType, writeOperationType, Collections.emptySet());
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String str, int i, String str2, HoodieRecord.HoodieRecordType hoodieRecordType, WriteOperationType writeOperationType, Set<String> set) throws IOException {
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, writeOperationType);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(i, "false", "", "", str2, ""));
        makeConfig.configs.addAll(getAllMultiWriterConfigs());
        set.forEach(str3 -> {
            makeConfig.configs.add(str3);
        });
        return new HoodieDeltaStreamer(makeConfig, jsc);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String str, String str2, Boolean bool, String str3, HoodieRecord.HoodieRecordType hoodieRecordType) {
        return initialHoodieClusteringJob(str, str2, bool, str3, null, hoodieRecordType);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String str, String str2, Boolean bool, String str3) {
        return initialHoodieClusteringJob(str, str2, bool, str3, null, HoodieRecord.HoodieRecordType.AVRO);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String str, String str2, Boolean bool, String str3, Boolean bool2, HoodieRecord.HoodieRecordType hoodieRecordType) {
        HoodieClusteringJob.Config buildHoodieClusteringUtilConfig = buildHoodieClusteringUtilConfig(str, str2, bool, str3, bool2);
        addRecordMerger(hoodieRecordType, buildHoodieClusteringUtilConfig.configs);
        buildHoodieClusteringUtilConfig.configs.addAll(getAllMultiWriterConfigs());
        return new HoodieClusteringJob(jsc, buildHoodieClusteringUtilConfig);
    }

    @Test
    public void testProps() {
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/test-source.properties")).getProps();
        Assertions.assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assertions.assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
        Assertions.assertEquals("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator", props.getString("hoodie.datasource.write.keygenerator.class"));
    }

    private static HoodieStreamer.Config getBaseConfig() {
        HoodieStreamer.Config config = new HoodieStreamer.Config();
        config.targetBasePath = "s3://mybucket/blah";
        config.tableType = "COPY_ON_WRITE";
        config.targetTableName = "test";
        return config;
    }

    private static Stream<Arguments> schemaEvolArgs() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.SPARK})});
    }

    private static Stream<Arguments> provideValidCliArgs() {
        HoodieStreamer.Config baseConfig = getBaseConfig();
        HoodieStreamer.Config baseConfig2 = getBaseConfig();
        baseConfig2.baseFileFormat = "PARQUET";
        HoodieStreamer.Config baseConfig3 = getBaseConfig();
        baseConfig3.sourceLimit = Long.parseLong("500");
        HoodieStreamer.Config baseConfig4 = getBaseConfig();
        baseConfig4.enableHiveSync = true;
        HoodieStreamer.Config baseConfig5 = getBaseConfig();
        baseConfig5.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table");
        HoodieStreamer.Config baseConfig6 = getBaseConfig();
        baseConfig6.configs = Arrays.asList("hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieStreamer.Config baseConfig7 = getBaseConfig();
        baseConfig7.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieStreamer.Config baseConfig8 = getBaseConfig();
        baseConfig8.baseFileFormat = "PARQUET";
        baseConfig8.sourceLimit = Long.parseLong("500");
        baseConfig8.enableHiveSync = true;
        baseConfig8.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test"}, baseConfig}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET"}, baseConfig2}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--source-limit", "500"}, baseConfig3}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--enable-hive-sync"}, baseConfig4}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table"}, baseConfig5}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig6}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig7}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--source-limit", "500", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET", "--enable-hive-sync", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig8})});
    }

    @MethodSource({"provideValidCliArgs"})
    @ParameterizedTest
    public void testValidCommandLineArgs(String[] strArr, HoodieStreamer.Config config) {
        Assertions.assertEquals(config, HoodieDeltaStreamer.getConfig(strArr));
    }

    @Test
    public void testKafkaConnectCheckpointProvider() throws IOException {
        String str = basePath + "/test_table";
        String str2 = basePath + "/kafka_topic1";
        String str3 = str2 + "/year=2016/month=05/day=01";
        String str4 = str3 + "/kafka_topic1+0+100+200.parquet";
        HoodieDeltaStreamer.Config makeDropAllConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeDropAllConfig(str, WriteOperationType.UPSERT);
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/test-source.properties")).getProps();
        props.put("hoodie.deltastreamer.checkpoint.provider.path", str2);
        makeDropAllConfig.initialCheckpointProvider = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
        fs.mkdirs(new Path(str2));
        fs.mkdirs(new Path(str3));
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(new HoodieTestDataGenerator().generateInserts("000", 100)), new Path(str4));
        Assertions.assertEquals("kafka_topic1,0:200", new HoodieDeltaStreamer(makeDropAllConfig, jsc, fs, jsc.hadoopConfiguration(), Option.ofNullable(props)).getConfig().checkpoint);
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() throws Exception {
        Exception exc = (Exception) Assertions.assertThrows(SparkException.class, () -> {
            new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/test_table_invalid_key_gen", WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), "test-invalid.properties", false), jsc).sync();
        }, "Should error out when setting the key generator class property to an invalid value");
        LOG.debug("Expected error during getting the key generator", exc);
        Assertions.assertTrue(exc.getMessage().contains("Could not load key generator class invalid"));
    }

    private static Stream<Arguments> provideInferKeyGenArgs() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"test-infer-complex-keygen.properties", ComplexKeyGenerator.class.getName()}), Arguments.of(new Object[]{"test-infer-nonpartitioned-keygen.properties", NonpartitionedKeyGenerator.class.getName()})});
    }

    @MethodSource({"provideInferKeyGenArgs"})
    @ParameterizedTest
    public void testInferKeyGenerator(String str, String str2) throws Exception {
        String str3 = basePath + "/" + str.split("\\.")[0];
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str3, WriteOperationType.UPSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), str, false), jsc).sync();
        Assertions.assertEquals(str2, HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(str3).build().getTableConfig().getKeyGeneratorClassName());
        Assertions.assertEquals(1000L, sqlContext.read().format("hudi").load(str3).count());
    }

    @Test
    public void testTableCreation() throws Exception {
        LOG.debug("Expected error during table creation", (Exception) Assertions.assertThrows(TableNotFoundException.class, () -> {
            fs.mkdirs(new Path(basePath + "/not_a_table"));
            new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc).sync();
        }, "Should error out when pointed out at a dir thats not a table"));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testTableCreationContainsHiveStylePartitioningEnable(boolean z) throws Exception {
        String str = basePath + "/url_encode_and_hive_style_partitioning_enable_" + z;
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.configs.add(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key() + "=" + z);
        makeConfig.configs.add(HoodieTableConfig.URL_ENCODE_PARTITIONING.key() + "=" + z);
        new HoodieDeltaStreamer(makeConfig, jsc).getIngestionService().ingestOnce();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(str).build();
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(Boolean.parseBoolean(build.getTableConfig().getHiveStylePartitioningEnable())));
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(Boolean.parseBoolean(build.getTableConfig().getUrlEncodePartitioning())));
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/test_table";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        syncAndAssertRecordCount(makeConfig, 1000, str, "00000", 1);
        makeConfig.sourceLimit = 0L;
        syncAndAssertRecordCount(makeConfig, 1000, str, "00000", 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.UPSERT;
        syncAndAssertRecordCount(makeConfig, 1950, str, "00001", 2);
        Assertions.assertEquals(1950L, countsPerCommit(str, sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        String str2 = basePath + "/src_bootstrapped";
        Dataset load = sqlContext.read().format("org.apache.hudi").load(str);
        load.write().format("parquet").partitionBy(new String[]{"rider"}).save(str2);
        String str3 = basePath + "/test_dataset_bootstrapped";
        makeConfig.runBootstrap = true;
        makeConfig.configs.add(String.format("hoodie.bootstrap.base.path=%s", str2));
        makeConfig.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
        makeConfig.configs.add(String.format("hoodie.datasource.write.keygenerator.class=%s", SimpleKeyGenerator.class.getName()));
        makeConfig.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
        makeConfig.configs.add("hoodie.bootstrap.parallelism=5");
        makeConfig.targetBasePath = str3;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        Dataset load2 = sqlContext.read().format("org.apache.hudi").load(str3);
        LOG.info("Schema :");
        load2.printSchema();
        assertRecordCount(1950L, str3, sqlContext);
        load2.registerTempTable("bootstrapped");
        Assertions.assertEquals(1950L, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
        sqlContext.sql("select * from bootstrapped").show();
        StructField[] fields = load2.schema().fields();
        List asList = Arrays.asList(load2.schema().fieldNames());
        List asList2 = Arrays.asList(load.schema().fieldNames());
        Assertions.assertEquals(asList2.size(), fields.length);
        Assertions.assertTrue(asList.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
        Assertions.assertTrue(asList.containsAll(asList2));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str3);
    }

    @Test
    public void testModifiedTableConfigs() throws Exception {
        String str = basePath + "/test_table_modified_configs";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        syncAndAssertRecordCount(makeConfig, 1000, str, "00000", 1);
        makeConfig.sourceLimit = 0L;
        syncAndAssertRecordCount(makeConfig, 1000, str, "00000", 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.UPSERT;
        makeConfig.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
        Assertions.assertThrows(HoodieException.class, () -> {
            syncAndAssertRecordCount(makeConfig, 1000, str, "00000", 1);
        });
        Assertions.assertEquals(1000L, countsPerCommit(str, sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        HoodieDeltaStreamer.Config makeConfig2 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        makeConfig2.sourceLimit = 2000L;
        makeConfig2.operation = WriteOperationType.UPSERT;
        syncAndAssertRecordCount(makeConfig2, 1950, str, "00001", 2);
        Assertions.assertEquals(1950L, countsPerCommit(str, sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
    }

    private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config config, Integer num, String str, String str2, Integer num2) throws Exception {
        new HoodieDeltaStreamer(config, jsc).sync();
        assertRecordCount(num.intValue(), str, sqlContext);
        assertDistanceCount(num.intValue(), str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(str2, str, fs, num2.intValue());
    }

    @MethodSource({"schemaEvolArgs"})
    @ParameterizedTest
    public void testSchemaEvolution(String str, boolean z, boolean z2, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str2 = basePath + "/test_table_schema_evolution" + str + "_" + z + "_" + z2;
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str2, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        makeConfig.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
        makeConfig.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!z2) {
            makeConfig.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
        }
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str2, fs, 1);
        HoodieDeltaStreamer.Config makeConfig2 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str2, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        addRecordMerger(hoodieRecordType, makeConfig2.configs);
        makeConfig2.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        makeConfig2.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        makeConfig2.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!z2) {
            makeConfig2.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
        }
        new HoodieDeltaStreamer(makeConfig2, jsc).sync();
        assertRecordCount(1450L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", str2, fs, 2);
        Assertions.assertEquals(1450L, countsPerCommit(str2, sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        sqlContext.read().format("org.apache.hudi").load(str2).createOrReplaceTempView("tmp_trips");
        Assertions.assertEquals(950L, sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count());
        if (!z) {
            defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName();
        }
        HoodieDeltaStreamer.Config makeConfig3 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str2, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        addRecordMerger(hoodieRecordType, makeConfig3.configs);
        makeConfig3.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        if (z) {
            makeConfig3.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        }
        if (!z2) {
            makeConfig3.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
        }
        makeConfig3.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(makeConfig3, jsc).sync();
        assertRecordCount(1900L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00002", str2, fs, 3);
        Assertions.assertEquals(1900L, countsPerCommit(str2, sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        Schema tableAvroSchema = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(str2).setConf(fs.getConf()).build()).getTableAvroSchema(false);
        Assertions.assertNotNull(tableAvroSchema);
        Assertions.assertEquals(!z2 ? new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved.avsc"))) : new Schema.Parser().parse(fs.open(new Path(basePath + "/source_evolved_post_processed.avsc"))), tableAvroSchema);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(makeConfig3.targetBasePath, jsc.hadoopConfiguration()), basePath + "/test-source.properties");
        writeCommonPropsToFile(fs, basePath);
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    @Timeout(600)
    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testUpsertsCOWContinuousMode(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", hoodieRecordType);
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/non_continuous_cow";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        makeConfig.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
        makeConfig.continuousMode = false;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str, sqlContext);
        Assertions.assertFalse(Metrics.isInitialized(str), "Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Timeout(600)
    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO"})
    @ParameterizedTest
    public void testUpsertsMORContinuousModeShutdownGracefully(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_cow", true, hoodieRecordType);
    }

    @Timeout(600)
    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testUpsertsMORContinuousMode(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", hoodieRecordType);
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/non_continuous_mor";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
        makeConfig.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        makeConfig.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
        makeConfig.continuousMode = false;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str, sqlContext);
        Assertions.assertFalse(Metrics.isInitialized(str), "Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    private void testUpsertsContinuousMode(HoodieTableType hoodieTableType, String str, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testUpsertsContinuousMode(hoodieTableType, str, false, hoodieRecordType);
    }

    private void testUpsertsContinuousMode(HoodieTableType hoodieTableType, String str, boolean z, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str2 = basePath + "/" + str;
        int i = 3000;
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str2, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        if (z) {
            makeConfig.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
        }
        makeConfig.tableType = hoodieTableType.name();
        makeConfig.configs.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), 3000));
        makeConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(5, str2, fs);
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(2, str2, fs);
            } else {
                HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(5, str2, fs);
            }
            assertRecordCount(i, str2, sqlContext);
            assertDistanceCount(i, str2, sqlContext);
            if (z) {
                TestDataSource.returnEmptyBatch = true;
            }
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, HoodieDeltaStreamer.Config config, Function<Boolean, Boolean> function) throws Exception {
        deltaStreamerTestRunner(hoodieDeltaStreamer, config, function, "single_ds_job");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, HoodieDeltaStreamer.Config config, Function<Boolean, Boolean> function, String str) throws Exception {
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            try {
                hoodieDeltaStreamer.sync();
            } catch (Exception e) {
                LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + str);
                throw new RuntimeException(e.getMessage(), e);
            }
        });
        HoodieDeltaStreamerTestBase.TestHelpers.waitTillCondition(function, submit, 360L);
        if (config != null && !config.postWriteTerminationStrategyClass.isEmpty()) {
            awaitDeltaStreamerShutdown(hoodieDeltaStreamer);
        } else {
            hoodieDeltaStreamer.shutdownGracefully();
            submit.get();
        }
    }

    static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer hoodieDeltaStreamer) throws InterruptedException {
        boolean z = false;
        int i = 0;
        while (!z) {
            z = hoodieDeltaStreamer.getIngestionService().isShutdownRequested();
            Thread.sleep(500L);
            i += 500;
            if (i > 120000) {
                Assertions.fail("Deltastreamer should have shutdown by now");
            }
        }
        boolean z2 = false;
        while (!z2) {
            z2 = hoodieDeltaStreamer.getIngestionService().isShutdown();
            Thread.sleep(500L);
            i += 500;
            if (i > 120000) {
                Assertions.fail("Deltastreamer should have shutdown by now");
            }
        }
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, Function<Boolean, Boolean> function) throws Exception {
        deltaStreamerTestRunner(hoodieDeltaStreamer, null, function);
    }

    @ParameterizedTest
    @CsvSource({"AVRO", "SPARK"})
    public void testInlineClustering(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/inlineClustering";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "true", "2", "", ""));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str, fs);
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Test
    public void testDeltaSyncWithPendingClustering() throws Exception {
        String str = basePath + "/inlineClusteringPending";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = false;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, str, fs);
        initialHoodieClusteringJob(str, null, false, "schedule").cluster(0);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(str).build();
        HoodieInstant hoodieInstant = (HoodieInstant) build.getActiveTimeline().filterPendingReplaceTimeline().getInstants().get(0);
        build.getActiveTimeline().transitionReplaceRequestedToInflight(hoodieInstant, Option.empty());
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "true", "2", "", ""));
        makeConfig.retryLastPendingInlineClusteringJob = true;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        Assertions.assertEquals(hoodieInstant.getTimestamp(), ((HoodieInstant) build.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
    }

    @Test
    public void testDeltaSyncWithPendingCompaction() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum;
        HoodieTestDataGenerator prepareParquetDFSFiles = prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.compact.inline", "true");
        typedProperties.setProperty("hoodie.compact.inline.max.delta.commits", "2");
        typedProperties.setProperty("hoodie.datasource.write.table.type", "MERGE_ON_READ");
        typedProperties.setProperty("hoodie.datasource.compaction.async.enable", "false");
        prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "", typedProperties);
        String str = basePath + "test_parquet_table" + testNum;
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, "MERGE_ON_READ", "timestamp", null);
        makeConfig.retryLastPendingInlineCompactionJob = false;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(makeConfig, jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(100, str, sqlContext);
        prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null, prepareParquetDFSFiles, "001");
        hoodieDeltaStreamer.sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(2, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(1, str, fs);
        fs.delete(new Path(str + "/.hoodie/" + ((HoodieInstant) HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(str).build().getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant().get()).getFileName()), false);
        prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null, prepareParquetDFSFiles, "002");
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(3, str, fs);
        Assertions.assertEquals(1, HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(str).build().getActiveTimeline().getRollbackTimeline().getInstants().size());
    }

    @ParameterizedTest
    @CsvSource({"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
    public void testCleanerDeleteReplacedDataWithArchive(Boolean bool, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/cleanerDeleteReplacedDataWithArchive" + bool;
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "true", "2", "", ""));
        makeConfig.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
        makeConfig.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        makeConfig.configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool2 -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, str, fs);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(6, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, str, fs);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(str).build();
        HoodieTimeline completedReplaceTimeline = build.reloadActiveTimeline().getCompletedReplaceTimeline();
        Option nthFromLastInstant = completedReplaceTimeline.nthFromLastInstant(1);
        Assertions.assertTrue(nthFromLastInstant.isPresent());
        String str2 = null;
        List list = null;
        for (Map.Entry entry : ((HoodieReplaceCommitMetadata) HoodieReplaceCommitMetadata.fromBytes((byte[]) completedReplaceTimeline.getInstantDetails((HoodieInstant) nthFromLastInstant.get()).get(), HoodieReplaceCommitMetadata.class)).getPartitionToReplaceFileIds().entrySet()) {
            str2 = String.valueOf(entry.getKey());
            list = (List) entry.getValue();
        }
        Assertions.assertNotNull(str2);
        Assertions.assertNotNull(list);
        ArrayList arrayList = new ArrayList();
        RemoteIterator listFiles = build.getFs().listFiles(new Path(build.getBasePath(), str2), true);
        while (listFiles.hasNext()) {
            String uri = ((LocatedFileStatus) listFiles.next()).getPath().toUri().toString();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (uri.contains(String.valueOf(it.next()))) {
                    arrayList.add(uri);
                }
            }
        }
        Assertions.assertFalse(arrayList.isEmpty());
        List<String> asyncServicesConfigs = getAsyncServicesConfigs(1, "true", "true", "6", "", "");
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), bool));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        if (bool.booleanValue()) {
            asyncServicesConfigs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
            asyncServicesConfigs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
            asyncServicesConfigs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
        }
        addRecordMerger(hoodieRecordType, asyncServicesConfigs);
        makeConfig.configs = asyncServicesConfigs;
        makeConfig.continuousMode = false;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        Assertions.assertEquals(0L, build.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(hoodieInstant -> {
            return ((HoodieInstant) nthFromLastInstant.get()).equals(hoodieInstant);
        }).count());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assertions.assertFalse(build.getFs().exists(new Path((String) it2.next())));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    private List<String> getAllMultiWriterConfigs() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getCanonicalName()));
        arrayList.add(String.format("%s=%s", "hoodie.write.lock.wait_time_ms", "3000"));
        arrayList.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
        arrayList.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
        return arrayList;
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String str, String str2, Boolean bool) {
        return buildHoodieClusteringUtilConfig(str, str2, bool, null, null);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String str, String str2, Boolean bool, String str3, Boolean bool2) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = str;
        config.clusteringInstantTime = str2;
        config.runSchedule = bool;
        config.propsFilePath = UtilitiesTestBase.basePath + "/clusteringjob.properties";
        config.runningMode = str3;
        if (bool2 != null) {
            config.retryLastFailedClusteringJob = bool2;
        }
        return config;
    }

    private HoodieIndexer.Config buildIndexerConfig(String str, String str2, String str3, String str4, String str5) {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        config.basePath = str;
        config.tableName = str2;
        config.indexInstantTime = str3;
        config.propsFilePath = UtilitiesTestBase.basePath + "/indexer.properties";
        config.runningMode = str4;
        config.indexTypes = str5;
        return config;
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testHoodieIndexer(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/asyncindexer";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str, 1000, "false", hoodieRecordType, WriteOperationType.INSERT, Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() + "=true"));
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str, fs);
            Option.empty();
            try {
                Option doSchedule = new HoodieIndexer(jsc, buildIndexerConfig(str, initialHoodieDeltaStreamer.getConfig().targetTableName, null, "schedule", "COLUMN_STATS")).doSchedule();
                if (doSchedule.isPresent()) {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertPendingIndexCommit(str, fs);
                    LOG.info("Schedule indexing success, now build index with instant time " + ((String) doSchedule.get()));
                    new HoodieIndexer(jsc, buildIndexerConfig(str, initialHoodieDeltaStreamer.getConfig().targetTableName, (String) doSchedule.get(), "execute", "COLUMN_STATS")).start(0);
                    LOG.info("Metadata indexing success");
                    HoodieDeltaStreamerTestBase.TestHelpers.assertCompletedIndexCommit(str, fs);
                } else {
                    LOG.warn("Metadata indexing failed");
                }
                return true;
            } catch (Exception e) {
                LOG.info("Schedule indexing failed", e);
                return false;
            }
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieAsyncClusteringJob(boolean z) throws Exception {
        String str = basePath + "/asyncClusteringJob";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str, 3000, "false", HoodieRecord.HoodieRecordType.AVRO);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str, fs);
            countDownLatch.countDown();
            return true;
        });
        if (!countDownLatch.await(2L, TimeUnit.MINUTES)) {
            Assertions.fail("Deltastreamer should have completed 2 commits.");
            return;
        }
        Option empty = Option.empty();
        try {
            empty = initialHoodieClusteringJob(str, null, true, null).doSchedule();
        } catch (Exception e) {
            LOG.warn("Schedule clustering failed", e);
            Assertions.fail("Schedule clustering failed", e);
        }
        if (!empty.isPresent()) {
            LOG.warn("Clustering execution failed");
            Assertions.fail("Clustering execution failed");
            return;
        }
        LOG.info("Schedule clustering success, now cluster with instant time " + ((String) empty.get()));
        HoodieClusteringJob.Config buildHoodieClusteringUtilConfig = buildHoodieClusteringUtilConfig(str, z ? (String) empty.get() : null, false);
        new HoodieClusteringJob(jsc, buildHoodieClusteringUtilConfig).cluster(buildHoodieClusteringUtilConfig.retry);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
        LOG.info("Cluster success");
    }

    @Disabled("HUDI-6753")
    public void testAsyncClusteringServiceSparkRecordType() throws Exception {
        testAsyncClusteringService(HoodieRecord.HoodieRecordType.SPARK);
    }

    @Test
    public void testAsyncClusteringServiceAvroRecordType() throws Exception {
        testAsyncClusteringService(HoodieRecord.HoodieRecordType.AVRO);
    }

    private void testAsyncClusteringService(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/asyncClustering";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "3"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
        assertDistinctRecordCount(2000, str, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Timeout(600)
    @Test
    public void testAsyncClusteringServiceWithConflictsAvro() throws Exception {
        testAsyncClusteringServiceWithConflicts(HoodieRecord.HoodieRecordType.AVRO);
    }

    private void testAsyncClusteringServiceWithConflicts(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/asyncClusteringWithConflicts_" + hoodieRecordType.name();
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "2"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommitsAfterRollback(1, 1, str, fs);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(3, str, fs);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Timeout(600)
    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testAsyncClusteringServiceWithCompaction(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/asyncClusteringCompaction";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "3"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits(2, str, fs);
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
            return true;
        });
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(4, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(1, str, fs);
        assertDistinctRecordCount(2000, str, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @ParameterizedTest
    @CsvSource({"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
    public void testAsyncClusteringJobWithRetry(boolean z, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/asyncClustering3";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = false;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "false", "0", "false", "0"));
        makeConfig.configs.addAll(getAllMultiWriterConfigs());
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, str, fs);
        initialHoodieClusteringJob(str, null, false, "schedule").cluster(0);
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(str).build();
        HoodieInstant hoodieInstant = (HoodieInstant) build.getActiveTimeline().filterPendingReplaceTimeline().getInstants().get(0);
        build.getActiveTimeline().transitionReplaceRequestedToInflight(hoodieInstant, Option.empty());
        initialHoodieClusteringJob(str, null, false, "scheduleAndExecute", Boolean.valueOf(z), hoodieRecordType).cluster(0);
        String timestamp = ((HoodieInstant) build.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp();
        if (z) {
            Assertions.assertEquals(hoodieInstant.getTimestamp(), timestamp);
        } else {
            Assertions.assertFalse(hoodieInstant.getTimestamp().equalsIgnoreCase(timestamp));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @ParameterizedTest
    @CsvSource({"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"})
    public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String str, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str2 = basePath + "/asyncClustering2";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str2, 3000, "false", hoodieRecordType, WriteOperationType.BULK_INSERT);
        HoodieClusteringJob initialHoodieClusteringJob = initialHoodieClusteringJob(str2, null, true, str, hoodieRecordType);
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str2, fs);
            try {
                if (initialHoodieClusteringJob.cluster(0) == 0) {
                    LOG.info("Cluster success");
                } else {
                    LOG.warn("Cluster failed");
                    if (!str.toLowerCase().equals("execute")) {
                        return false;
                    }
                }
            } catch (Exception e) {
                LOG.warn("ScheduleAndExecute clustering failed", e);
                if (!str.equalsIgnoreCase("execute")) {
                    return false;
                }
            }
            String lowerCase = str.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1319569547:
                    if (lowerCase.equals("execute")) {
                        z = 2;
                        break;
                    }
                    break;
                case -697920873:
                    if (lowerCase.equals("schedule")) {
                        z = true;
                        break;
                    }
                    break;
                case 1514639253:
                    if (lowerCase.equals("scheduleandexecute")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceCommits(2, str2, fs);
                    return true;
                case true:
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNReplaceRequests(2, str2, fs);
                    HoodieDeltaStreamerTestBase.TestHelpers.assertNoReplaceCommits(str2, fs);
                    return true;
                case true:
                    HoodieDeltaStreamerTestBase.TestHelpers.assertNoReplaceCommits(str2, fs);
                    return true;
                default:
                    throw new IllegalStateException("Unexpected value: " + str);
            }
        });
        if (str.toLowerCase(Locale.ROOT).equals("scheduleandexecute")) {
            UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str2);
        }
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/test_table2";
        String str2 = basePath + "/test_downstream_table2";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str, sqlContext);
        assertDistanceCount(1000L, str, sqlContext);
        assertDistanceCountWithExactValue(1000L, str, sqlContext);
        String assertCommitMetadata = HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, null);
        addRecordMerger(hoodieRecordType, makeConfigForHudiIncrSrc.configs);
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str2, sqlContext);
        assertDistanceCount(1000L, str2, sqlContext);
        assertDistanceCountWithExactValue(1000L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, fs, 1);
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str, sqlContext);
        assertDistanceCount(1000L, str, sqlContext);
        assertDistanceCountWithExactValue(1000L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName()), jsc).sync();
        assertRecordCount(1000L, str2, sqlContext);
        assertDistanceCount(1000L, str2, sqlContext);
        assertDistanceCountWithExactValue(1000L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, fs, 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1950L, str, sqlContext);
        assertDistanceCount(1950L, str, sqlContext);
        assertDistanceCountWithExactValue(1950L, str, sqlContext);
        String assertCommitMetadata2 = HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", str, fs, 2);
        Assertions.assertEquals(1950L, countsPerCommit(str, sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc2 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.UPSERT, false, null);
        addRecordMerger(hoodieRecordType, makeConfigForHudiIncrSrc2.configs);
        makeConfigForHudiIncrSrc2.sourceLimit = 2000L;
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc2, jsc).sync();
        assertRecordCount(2000L, str2, sqlContext);
        assertDistanceCount(2000L, str2, sqlContext);
        assertDistanceCountWithExactValue(2000L, str2, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata(assertCommitMetadata2, str2, fs, 2);
        Assertions.assertEquals(2000L, countsPerCommit(str2, sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(str, "hive_trips");
        hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, "year,month,day");
        hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf());
        HoodieHiveSyncClient hoodieHiveSyncClient = new HoodieHiveSyncClient(hiveSyncConfig);
        String string = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue(hoodieHiveSyncClient.tableExists(string), "Table " + string + " should exist");
        Assertions.assertEquals(3, hoodieHiveSyncClient.getAllPartitions(string).size(), "Table partitions should match the number of partitions we wrote");
        Assertions.assertEquals(assertCommitMetadata2, hoodieHiveSyncClient.getLastCommitTimeSynced(string).get(), "The last commit that was synced should be updated in the TBLPROPERTIES");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str2);
    }

    @Test
    public void testNullSchemaProvider() throws Exception {
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/test_table", WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true, false, false, null, null);
        Exception exc = (Exception) Assertions.assertThrows(HoodieException.class, () -> {
            new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        }, "Should error out when schema provider is not provided");
        LOG.debug("Expected error during reading data from source ", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide a valid schema provider class!"));
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String str = basePath + "/test_dataset_mor_payload_class_update";
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, "MERGE_ON_READ"), jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str, sqlContext);
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assertions.assertEquals(new HoodieConfig(properties).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), DummyAvroPayload.class.getName());
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPartialPayloadClass() throws Exception {
        String str = basePath + "/test_dataset_mor";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str, sqlContext);
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assertions.assertEquals(new HoodieConfig(properties).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), PartialUpdateAvroPayload.class.getName());
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String str = basePath + "/test_dataset_cow";
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, null), jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(1000L, str, sqlContext);
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assertions.assertFalse(properties.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testFilterDupes(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        makeConfig.filterDupes = true;
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.INSERT;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(2000L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", str, fs, 2);
        List<Row> countsPerCommit = countsPerCommit(str, sqlContext);
        Assertions.assertEquals(1000L, countsPerCommit.get(0).getLong(1));
        Assertions.assertEquals(1000L, countsPerCommit.get(1).getLong(1));
        HoodieInstant hoodieInstant = (HoodieInstant) HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config makeDropAllConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeDropAllConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeDropAllConfig.configs);
        makeDropAllConfig.filterDupes = false;
        makeDropAllConfig.sourceLimit = 2000L;
        makeDropAllConfig.operation = WriteOperationType.UPSERT;
        makeDropAllConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        new HoodieDeltaStreamer(makeDropAllConfig, jsc).sync();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
        HoodieInstant hoodieInstant2 = (HoodieInstant) build.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assertions.assertTrue(HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.GREATER_THAN, hoodieInstant.getTimestamp()));
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) build.getActiveTimeline().getInstantDetails(hoodieInstant2).get(), HoodieCommitMetadata.class);
        System.out.println("New Commit Metadata=" + hoodieCommitMetadata);
        Assertions.assertTrue(hoodieCommitMetadata.getPartitionToWriteStats().isEmpty());
        makeDropAllConfig.filterDupes = true;
        makeDropAllConfig.operation = WriteOperationType.UPSERT;
        try {
            new HoodieDeltaStreamer(makeDropAllConfig, jsc).sync();
        } catch (IllegalArgumentException e) {
            Assertions.assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Test
    public void testDistributedTestDataSource() {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), "1000");
        typedProperties.setProperty(SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.key(), "1");
        typedProperties.setProperty(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(), "true");
        InputBatch fetchNext = new DistributedTestDataSource(typedProperties, jsc, sparkSession, null).fetchNext(Option.empty(), 10000000L);
        ((JavaRDD) fetchNext.getBatch().get()).cache();
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNext.getBatch().get()).count());
    }

    private static void prepareJsonKafkaDFSFiles(int i, boolean z, String str) {
        prepareJsonKafkaDFSFiles(i, z, str, 2);
    }

    private static void prepareJsonKafkaDFSFiles(int i, boolean z, String str, int i2) {
        if (z) {
            try {
                testUtils.createTopic(str, i2);
            } catch (TopicExistsException e) {
            }
        }
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", Integer.valueOf(i), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"), i2));
    }

    private void testParquetDFSSource(boolean z, List<String> list) throws Exception {
        testParquetDFSSource(z, list, false);
    }

    private void testParquetDFSSource(boolean z, List<String> list, boolean z2) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        boolean z3 = (list == null || list.isEmpty()) ? false : true;
        prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        prepareParquetDFSSource(z, z3, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", z2 ? "1" : "");
        String str = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, z2 ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), list, "test-parquet-dfs-source.properties", false, z, 100000, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(10, str, sqlContext);
        if (z2) {
            prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
            hoodieDeltaStreamer.sync();
            assertRecordCount(10, str, sqlContext);
            Assertions.assertNotEquals(new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(str).setConf(jsc.hadoopConfiguration()).build()).getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString());
        }
        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null);
        hoodieDeltaStreamer.sync();
        assertRecordCount(10 + 100, str, sqlContext);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setBasePath(str).setConf(jsc.hadoopConfiguration()).build();
        build.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(hoodieInstant -> {
            assertValidSchemaInCommitMetadata(hoodieInstant, build);
        });
        testNum++;
    }

    private void assertValidSchemaInCommitMetadata(HoodieInstant hoodieInstant, HoodieTableMetaClient hoodieTableMetaClient) {
        try {
            Assertions.assertFalse(StringUtils.isNullOrEmpty(((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTableMetaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class)).getMetadata("schema")));
        } catch (IOException e) {
            throw new HoodieException("Failed to parse commit metadata for " + hoodieInstant.toString());
        }
    }

    private void testORCDFSSource(boolean z, List<String> list) throws Exception {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
            if (list != null) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target.avsc");
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fs, basePath + "/test-orc-dfs-source.properties");
        String str = basePath + "/test_orc_source_table" + testNum;
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ORCDFSSource.class.getName(), list, "test-orc-dfs-source.properties", false, z, 100000, false, null, null, "timestamp", null), jsc).sync();
        assertRecordCount(5L, str, sqlContext);
        testNum++;
    }

    private void prepareJsonKafkaDFSSource(String str, String str2, String str3) throws IOException {
        prepareJsonKafkaDFSSource(str, str2, str3, null, false);
    }

    private void prepareJsonKafkaDFSSource(String str, String str2, String str3, Map<String, String> map, boolean z) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        populateAllCommonProps(typedProperties, basePath, testUtils.brokerAddress());
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", str3);
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", this.kafkaCheckpointType);
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc");
        typedProperties.setProperty("auto.offset.reset", str2);
        if (map != null && !map.isEmpty()) {
            typedProperties.getClass();
            map.forEach(typedProperties::setProperty);
        }
        typedProperties.setProperty(HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(z));
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fs, basePath + "/" + str);
    }

    private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean z) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfsToKafka" + testNum;
        prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "driver");
        String str = basePath + "/test_dfs_to_kafka" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(10, str, sqlContext);
        hoodieDeltaStreamer.shutdownGracefully();
        topicName = "topic" + testNum;
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", z ? "latest" : "earliest", topicName);
        HoodieDeltaStreamer hoodieDeltaStreamer2 = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer2.sync();
        assertRecordCount(10 + (z ? 0 : 5), str, sqlContext);
        prepareJsonKafkaDFSFiles(20, false, topicName);
        hoodieDeltaStreamer2.sync();
        assertRecordCount(r0 + 20, str, sqlContext);
        testNum++;
    }

    @Test
    public void testJsonKafkaDFSSource() throws Exception {
        topicName = "topic" + testNum;
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String str = basePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(5L, str, sqlContext);
        prepareJsonKafkaDFSFiles(10, false, topicName);
        hoodieDeltaStreamer.sync();
        assertRecordCount(5 + 10, str, sqlContext);
    }

    @Test
    public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
        topicName = "topic" + testNum;
        int i = 30 / 2;
        long epochMilli = Instant.now().toEpochMilli();
        prepareJsonKafkaDFSFiles(30, true, topicName, 2);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName, null, true);
        String str = basePath + "/test_json_kafka_offsets_table" + testNum;
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc).sync();
        sqlContext.clearCache();
        Dataset load = sqlContext.read().format("org.apache.hudi").load(str);
        Assertions.assertEquals(30, load.count());
        for (int i2 = 0; i2 < 2; i2++) {
            Assertions.assertEquals(i, load.filter("_hoodie_kafka_source_partition=" + i2).count());
        }
        Assertions.assertEquals(30, load.filter("_hoodie_kafka_source_timestamp>" + epochMilli).filter("_hoodie_kafka_source_timestamp<" + Instant.now().toEpochMilli()).count());
        sqlContext.read().format("org.apache.hudi").load(str).col("_hoodie_kafka_source_offset");
        for (int i3 = 0; i3 < i; i3++) {
            for (int i4 = 0; i4 < 2; i4++) {
                Assertions.assertEquals(1L, load.filter("_hoodie_kafka_source_offset=" + i3).filter("_hoodie_kafka_source_partition=" + i4).count());
            }
        }
    }

    @Test
    public void testKafkaTimestampType() throws Exception {
        topicName = "topic" + testNum;
        this.kafkaCheckpointType = "timestamp";
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String str = basePath + "/test_json_kafka_table" + testNum;
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc).sync();
        assertRecordCount(5L, str, sqlContext);
        prepareJsonKafkaDFSFiles(5, false, topicName);
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc).sync();
        assertRecordCount(10L, str, sqlContext);
    }

    @Disabled("HUDI-6609")
    public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
        HoodieTestDataGenerator prepareParquetDFSFiles = prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "driver");
        String str = basePath + "/test_multi_checkpoint" + testNum;
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
        makeConfig.configs = new ArrayList();
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(100L, str, sqlContext);
        topicName = "topic" + testNum;
        prepareJsonKafkaDFSFiles(20, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName, new HashMap(), false);
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, Integer.MAX_VALUE, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(100 + 20, str, sqlContext);
        prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA, prepareParquetDFSFiles, "001");
        HoodieDeltaStreamer hoodieDeltaStreamer2 = new HoodieDeltaStreamer(makeConfig, jsc);
        hoodieDeltaStreamer2.sync();
        assertRecordCount((100 * 2) + 20, str, sqlContext);
        HoodieTableMetaClient init = HoodieTestUtils.init(jsc.hadoopConfiguration(), str);
        List instants = init.getCommitsTimeline().getInstants();
        ObjectMapper objectMapper = new ObjectMapper();
        String str2 = (String) ((Map) objectMapper.readValue((String) ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) init.getCommitsTimeline().getInstantDetails((HoodieInstant) instants.get(0)).get(), HoodieCommitMetadata.class)).getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class)).get("parquet");
        Assertions.assertNotNull(str2);
        Map map = (Map) objectMapper.readValue((String) ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) init.getCommitsTimeline().getInstantDetails((HoodieInstant) instants.get(1)).get(), HoodieCommitMetadata.class)).getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class);
        String str3 = (String) map.get("kafka");
        Assertions.assertNotNull(str3);
        Assertions.assertEquals(str2, map.get("parquet"));
        Map map2 = (Map) objectMapper.readValue((String) ((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) init.getCommitsTimeline().getInstantDetails((HoodieInstant) instants.get(2)).get(), HoodieCommitMetadata.class)).getExtraMetadata().get("deltastreamer.checkpoint.key"), Map.class);
        String str4 = (String) map2.get("parquet");
        Assertions.assertNotNull(str4);
        Assertions.assertEquals(str3, map2.get("kafka"));
        Assertions.assertTrue(Long.parseLong(str4) > Long.parseLong(str2));
        hoodieDeltaStreamer2.shutdownGracefully();
        hoodieDeltaStreamer.shutdownGracefully();
    }

    @Test
    public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
        testDeltaStreamerTransitionFromParquetToKafkaSource(false);
    }

    @Test
    public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
        testDeltaStreamerTransitionFromParquetToKafkaSource(true);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceForEmptyBatch() throws Exception {
        testParquetDFSSource(false, null, true);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
        testDeltaStreamerRestartAfterMissingHoodieProps(true);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodiePropsAfterValidCommit() throws Exception {
        testDeltaStreamerRestartAfterMissingHoodieProps(false);
    }

    private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean z) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        boolean z2 = false;
        prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
        String str = basePath + "/test_parquet_table" + testNum;
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, z ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc).sync();
        if (z) {
            Arrays.stream(fs.listStatus(new Path(str + "/.hoodie/"))).filter(fileStatus -> {
                return fileStatus.getPath().getName().contains("commit") || fileStatus.getPath().getName().contains("inflight");
            }).forEach(fileStatus2 -> {
                try {
                    fs.delete(fileStatus2.getPath());
                } catch (IOException e) {
                    LOG.warn("Failed to delete " + fileStatus2.getPath().toString(), e);
                }
            });
        }
        fs.delete(new Path(str + "/.hoodie/hoodie.properties"));
        if (z) {
            new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc).sync();
            assertRecordCount(10, str, sqlContext);
        } else {
            Assertions.assertThrows(HoodieIOException.class, () -> {
                new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, z2, 100000, false, null, null, "timestamp", null), jsc);
            });
        }
        testNum++;
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        if (HoodieSparkUtils$.MODULE$.gteqSpark3_0()) {
            testORCDFSSource(false, null);
        }
    }

    @Test
    public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
        if (HoodieSparkUtils$.MODULE$.gteqSpark3_0()) {
            testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }
    }

    private void prepareCsvDFSSource(boolean z, char c, boolean z2, boolean z3) throws IOException {
        String str = basePath + "/csvFiles";
        String str2 = (z || z2) ? "_row_key" : "_c1";
        String str3 = (z || z2) ? "partition_path" : "_c2";
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", str2);
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", str3);
        if (z2) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source-flattened.avsc");
            if (z3) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target-flattened.avsc");
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", str);
        if (c != ',') {
            if (c == '\t') {
                typedProperties.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
            } else {
                typedProperties.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(c));
            }
        }
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(z));
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fs, basePath + "/test-csv-dfs-source.properties");
        UtilitiesTestBase.Helpers.saveCsvToDFS(z, c, UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInserts("000", 3, true)), fs, str + "/1.csv");
    }

    private void testCsvDFSSource(boolean z, char c, boolean z2, List<String> list) throws Exception {
        prepareCsvDFSSource(z, c, z2, list != null);
        String str = basePath + "/test_csv_table" + testNum;
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, CsvDFSSource.class.getName(), list, "test-csv-dfs-source.properties", false, z2, 1000, false, null, null, (z || z2) ? "timestamp" : "_c0", null), jsc).sync();
        assertRecordCount(3L, str, sqlContext);
        testNum++;
    }

    @Test
    public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, ',', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
        testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
        testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(false, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(false, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
        Exception exc = (Exception) Assertions.assertThrows(AnalysisException.class, () -> {
            testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }, "Should error out when doing the transformation.");
        LOG.debug("Expected error during transformation", exc);
        Assertions.assertTrue(exc.getMessage().contains("cannot resolve 'begin_lat' given input columns:"));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
        testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareSqlSource() throws IOException {
        String str = basePath + "sqlSourceFiles";
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.source.sql.sql.query", "select * from test_sql_table");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fs, basePath + "/test-sql-source-source.properties");
        generateSqlSourceTestTable(str, "1", "1000", 1000, new HoodieTestDataGenerator());
    }

    private void generateSqlSourceTestTable(String str, String str2, String str3, int i, HoodieTestDataGenerator hoodieTestDataGenerator) throws IOException {
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInserts(str3, Integer.valueOf(i), false)), new Path(str, str2));
        sparkSession.read().parquet(str).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceSource() throws Exception {
        prepareSqlSource();
        StringBuilder append = new StringBuilder().append(basePath).append("/test_sql_source_table");
        int i = testNum;
        testNum = i + 1;
        String sb = append.append(i).toString();
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(sb, WriteOperationType.INSERT, SqlSource.class.getName(), Collections.emptyList(), "test-sql-source-source.properties", false, false, 1000, false, null, null, "timestamp", null, true), jsc).sync();
        assertRecordCount(1000L, sb, sqlContext);
    }

    @Disabled
    @Test
    public void testJdbcSourceIncrementalFetchInContinuousMode() {
        try {
            Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
            Throwable th = null;
            try {
                TypedProperties typedProperties = new TypedProperties();
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.user", "test");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
                typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "ID");
                typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
                UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fs, basePath + "/test-jdbc-source.properties");
                int i = 1000;
                int i2 = 100;
                String str = basePath + "/triprec";
                HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, JdbcSource.class.getName(), null, "test-jdbc-source.properties", false, false, 100, false, null, null, "timestamp", null);
                makeConfig.continuousMode = true;
                JdbcTestUtils.clearAndInsert("000", 1000, connection, new HoodieTestDataGenerator(), typedProperties);
                deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, jsc), makeConfig, bool -> {
                    HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNCompactionCommits((i / i2) + (i % i2 == 0 ? 0 : 1), str, fs);
                    assertRecordCount(i, str, sqlContext);
                    return true;
                });
                if (connection != null) {
                    if (0 != 0) {
                        try {
                            connection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        connection.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testHoodieIncrFallback() throws Exception {
        String str = basePath + "/incr_test_table";
        String str2 = basePath + "/incr_test_downstream_table";
        insertInTable(str, 1, WriteOperationType.BULK_INSERT);
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc = HoodieDeltaStreamerTestBase.TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, null);
        makeConfigForHudiIncrSrc.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, jsc).sync();
        insertInTable(str, 9, WriteOperationType.UPSERT);
        Assertions.assertThrows(HoodieIncrementalPathNotFoundException.class, () -> {
            new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, jsc).sync();
        });
        assertRecordCount(1000L, str2, sqlContext);
        if (makeConfigForHudiIncrSrc.configs == null) {
            makeConfigForHudiIncrSrc.configs = new ArrayList();
        }
        makeConfigForHudiIncrSrc.configs.remove(makeConfigForHudiIncrSrc.configs.size() - 1);
        makeConfigForHudiIncrSrc.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
        makeConfigForHudiIncrSrc.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
        makeConfigForHudiIncrSrc.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, jsc).sync();
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, jsc).sync();
        Assertions.assertEquals(sqlContext.read().format("org.apache.hudi").load(str).count(), sqlContext.read().format("org.apache.hudi").load(str2).count());
    }

    private void insertInTable(String str, int i, WriteOperationType writeOperationType) throws Exception {
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, writeOperationType, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false);
        if (makeConfig.configs == null) {
            makeConfig.configs = new ArrayList();
        }
        makeConfig.configs.add("hoodie.cleaner.commits.retained=2");
        makeConfig.configs.add("hoodie.keep.min.commits=4");
        makeConfig.configs.add("hoodie.keep.max.commits=5");
        makeConfig.configs.add("hoodie.test.source.generate.inserts=true");
        for (int i2 = 0; i2 < i; i2++) {
            new HoodieDeltaStreamer(makeConfig, jsc).sync();
        }
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testInsertOverwrite(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE, hoodieRecordType);
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testInsertOverwriteTable(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE, hoodieRecordType);
    }

    @Disabled("Local run passing; flaky in CI environment.")
    @Test
    public void testDeletePartitions() throws Exception {
        prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path");
        String str = basePath + "/test_parquet_table" + testNum;
        Assertions.assertFalse(getAllFileIDsInTable(str, Option.of("2016/03/15")).isEmpty());
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc).sync();
        assertRecordCount(5L, str, sqlContext);
        testNum++;
        prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        prepareParquetDFSSource(false, false);
        new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc).sync();
        assertNoPartitionMatch(str, sqlContext, "2016/03/15");
        Assertions.assertTrue(getAllFileIDsInTable(str, Option.of("2016/03/15")).isEmpty());
    }

    @Test
    public void testToSortedTruncatedStringSecretsMasked() {
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/test-source.properties")).getProps();
        props.put("ssl.trustore.location", "SSL SECRET KEY");
        props.put("sasl.jaas.config", "SASL SECRET KEY");
        props.put("auth.credentials", "AUTH CREDENTIALS");
        props.put("auth.user.info", "AUTH USER INFO");
        String sortedTruncatedString = HoodieDeltaStreamer.toSortedTruncatedString(props);
        Assertions.assertFalse(sortedTruncatedString.contains("SSL SECRET KEY"));
        Assertions.assertFalse(sortedTruncatedString.contains("SASL SECRET KEY"));
        Assertions.assertFalse(sortedTruncatedString.contains("AUTH CREDENTIALS"));
        Assertions.assertFalse(sortedTruncatedString.contains("AUTH USER INFO"));
        Assertions.assertTrue(sortedTruncatedString.contains("SENSITIVE_INFO_MASKED"));
    }

    void testDeltaStreamerWithSpecifiedOperation(String str, WriteOperationType writeOperationType, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str, sqlContext);
        assertDistanceCount(1000L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        getAllFileIDsInTable(str, Option.empty());
        makeConfig.operation = writeOperationType;
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        if (writeOperationType == WriteOperationType.INSERT_OVERWRITE) {
            assertRecordCount(1000L, str, sqlContext);
            assertDistanceCount(1000L, str, sqlContext);
            HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        } else if (writeOperationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(str).build();
            Assertions.assertEquals(0L, new HoodieTableFileSystemView(build, build.getCommitsAndCompactionTimeline()).getLatestFileSlices("").count());
            HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
            Assertions.assertTrue(getAllFileIDsInTable(str, Option.empty()).isEmpty());
        }
        makeConfig.sourceLimit = 1000L;
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(950L, str, sqlContext);
        assertDistanceCount(950L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", str, fs, 2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Test
    public void testFetchingCheckpointFromPreviousCommits() throws IOException {
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(basePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "pp");
        TestStreamSync testStreamSync = new TestStreamSync(makeConfig, sparkSession, null, typedProperties, jsc, fs, jsc.hadoopConfiguration(), null);
        typedProperties.put(HoodieTableConfig.NAME.key(), "sample_tbl");
        HoodieTableMetaClient init = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE, typedProperties);
        HashMap hashMap = new HashMap();
        hashMap.put("deltastreamer.checkpoint.key", "abc");
        addCommitToTimeline(init, hashMap);
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testStreamSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "abc");
        hashMap.put("deltastreamer.checkpoint.key", "def");
        addCommitToTimeline(init, hashMap);
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testStreamSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "def");
        addReplaceCommitToTimeline(init, Collections.emptyMap());
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testStreamSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "def");
    }

    @EnumSource(value = HoodieRecord.HoodieRecordType.class, names = {"AVRO", "SPARK"})
    @ParameterizedTest
    public void testDropPartitionColumns(HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        StringBuilder append = new StringBuilder().append(basePath).append("/test_drop_partition_columns");
        int i = testNum;
        testNum = i + 1;
        String sb = append.append(i).toString();
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(sb, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, sb, fs);
        Schema tableAvroSchemaFromDataFile = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(sb).setConf(fs.getConf()).build()).getTableAvroSchemaFromDataFile();
        Assertions.assertNotNull(tableAvroSchemaFromDataFile);
        Assertions.assertFalse(((List) tableAvroSchemaFromDataFile.getFields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList())).contains("partition_path"));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sb);
    }

    @Test
    public void testForceEmptyMetaSync() throws Exception {
        String str = basePath + "/test_force_empty_meta_sync";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        makeConfig.sourceLimit = 0L;
        makeConfig.allowCommitOnNoCheckpointChange = true;
        makeConfig.enableMetaSync = true;
        makeConfig.forceEmptyMetaSync = true;
        new HoodieDeltaStreamer(makeConfig, jsc, fs, hiveServer.getHiveConf()).sync();
        assertRecordCount(0L, str, sqlContext);
        HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(str, "hive_trips");
        hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
        HoodieHiveSyncClient hoodieHiveSyncClient = new HoodieHiveSyncClient(hiveSyncConfig);
        String string = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue(hoodieHiveSyncClient.tableExists(string), "Table " + string + " should exist");
    }

    @Test
    public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
        String str = basePath + "/test_resume_checkpoint_after_changing_cow_to_mor";
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, jsc).sync();
        assertRecordCount(1000L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00000", str, fs, 1);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, str, fs);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(makeConfig.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
        Properties properties = new Properties();
        properties.load((InputStream) fs.open(new Path(makeConfig.targetBasePath + "/.hoodie/hoodie.properties")));
        LOG.info("old props: {}", properties);
        properties.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
        LOG.info("new props: {}", properties);
        HoodieTableConfig.create(build.getFs(), new Path(build.getBasePathV2(), ".hoodie"), properties);
        HoodieDeltaStreamer.Config makeConfig2 = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        makeConfig2.tableType = HoodieTableType.MERGE_ON_READ.name();
        new HoodieDeltaStreamer(makeConfig2, jsc).sync();
        assertRecordCount(1450L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00001", str, fs, 2);
        Assertions.assertEquals(1450L, countsPerCommit(str, sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(1, str, fs);
        new HoodieDeltaStreamer(makeConfig2, jsc).sync();
        assertRecordCount(1900L, str, sqlContext);
        HoodieDeltaStreamerTestBase.TestHelpers.assertCommitMetadata("00002", str, fs, 3);
        Assertions.assertEquals(1900L, countsPerCommit(str, sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(1, str, fs);
        HoodieDeltaStreamerTestBase.TestHelpers.assertAtleastNDeltaCommits(2, str, fs);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    @Test
    public void testAutoGenerateRecordKeys() throws Exception {
        List list = null;
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        boolean z = (0 == 0 || list.isEmpty()) ? false : true;
        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        prepareParquetDFSSource(false, z, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "");
        String str = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc);
        hoodieDeltaStreamer.sync();
        assertRecordCount(100, str, sqlContext);
        prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
        hoodieDeltaStreamer.sync();
        assertRecordCount(100 + 200, str, sqlContext);
        testNum++;
    }

    @ParameterizedTest
    @CsvSource({"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO", "COPY_ON_WRITE, SPARK", "MERGE_ON_READ, SPARK"})
    public void testConfigurationHotUpdate(HoodieTableType hoodieTableType, HoodieRecord.HoodieRecordType hoodieRecordType) throws Exception {
        String str = basePath + String.format("/configurationHotUpdate_%s_%s", hoodieTableType.name(), hoodieRecordType.name());
        HoodieDeltaStreamer.Config makeConfig = HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        addRecordMerger(hoodieRecordType, makeConfig.configs);
        makeConfig.continuousMode = true;
        makeConfig.tableType = hoodieTableType.name();
        makeConfig.configHotUpdateStrategyClass = MockConfigurationHotUpdateStrategy.class.getName();
        long j = 200;
        makeConfig.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 200L));
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(makeConfig, jsc);
        deltaStreamerTestRunner(hoodieDeltaStreamer, makeConfig, bool -> {
            HoodieDeltaStreamerTestBase.TestHelpers.assertAtLeastNCommits(2, str, fs);
            Assertions.assertTrue(hoodieDeltaStreamer.getIngestionService().getProps().getLong(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key()) > j);
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, str);
    }

    private Set<String> getAllFileIDsInTable(String str, Option<String> option) {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(str).build();
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(build, build.getCommitsAndCompactionTimeline());
        return (Set) (option.isPresent() ? hoodieTableFileSystemView.getLatestBaseFiles((String) option.get()) : hoodieTableFileSystemView.getLatestBaseFiles()).map((v0) -> {
            return v0.getFileId();
        }).collect(Collectors.toSet());
    }

    private static Stream<Arguments> testORCDFSSource() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{false, null}), Arguments.arguments(new Object[]{true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())})});
    }
}
