package org.apache.hudi.utilities.functional;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.functional.TestHoodieSnapshotExporter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.class */
public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$DistanceUDF.class */
    public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double d, Double d2, Double d3, Double d4) {
            return Double.valueOf(HoodieDeltaStreamerTestBase.RANDOM.nextDouble());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$DropAllTransformer.class */
    public static class DropAllTransformer implements Transformer {
        public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(javaSparkContext.emptyRDD(), dataset.schema());
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$DummyAvroPayload.class */
    public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord genericRecord, Comparable comparable) {
            super(genericRecord, comparable);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestDeltaSync.class */
    class TestDeltaSync extends DeltaSync {
        public TestDeltaSync(HoodieDeltaStreamer.Config config, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties typedProperties, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration, Function<SparkRDDWriteClient, Boolean> function) throws IOException {
            super(config, sparkSession, schemaProvider, typedProperties, javaSparkContext, fileSystem, configuration, function);
        }

        protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline hoodieTimeline) throws IOException {
            return super.getLatestCommitMetadataWithValidCheckpointInfo(hoodieTimeline);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestFileBasedSchemaProviderNullTargetSchema.class */
    public static class TestFileBasedSchemaProviderNullTargetSchema extends FilebasedSchemaProvider {
        public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties typedProperties, JavaSparkContext javaSparkContext) {
            super(typedProperties, javaSparkContext);
        }

        public Schema getTargetSchema() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestGenerator.class */
    public static class TestGenerator extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestHelpers.class */
    public static class TestHelpers {
        TestHelpers() {
        }

        static HoodieDeltaStreamer.Config makeDropAllConfig(String str, WriteOperationType writeOperationType) {
            return makeConfig(str, writeOperationType, Collections.singletonList(DropAllTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType) {
            return makeConfig(str, writeOperationType, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list) {
            return makeConfig(str, writeOperationType, list, "test-source.properties", false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list, String str2, boolean z) {
            return makeConfig(str, writeOperationType, list, str2, z, true, false, null, null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list, String str2, boolean z, boolean z2, boolean z3, String str3, String str4) {
            return makeConfig(str, writeOperationType, TestDataSource.class.getName(), list, str2, z, z2, 1000, z3, str3, str4, "timestamp", null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, String str2, List<String> list, String str3, boolean z, boolean z2, int i, boolean z3, String str4, String str5, String str6, String str7) {
            return makeConfig(str, writeOperationType, str2, list, str3, z, z2, i, z3, str4, str5, str6, str7, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, String str2, List<String> list, String str3, boolean z, boolean z2, int i, boolean z3, String str4, String str5, String str6, String str7, boolean z4) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str;
            config.targetTableName = "hoodie_trips";
            config.tableType = str5 == null ? "COPY_ON_WRITE" : str5;
            config.sourceClassName = str2;
            config.transformerClassNames = list;
            config.operation = writeOperationType;
            config.enableHiveSync = Boolean.valueOf(z);
            config.sourceOrderingField = str6;
            config.propsFilePath = TestHoodieDeltaStreamer.dfsBasePath + "/" + str3;
            config.sourceLimit = i;
            config.checkpoint = str7;
            if (z3) {
                config.payloadClassName = str4;
            }
            if (z2) {
                config.schemaProviderClassName = HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
            }
            config.allowCommitOnNoCheckpointChange = Boolean.valueOf(z4);
            return config;
        }

        static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String str, String str2, WriteOperationType writeOperationType, boolean z, String str3) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str2;
            config.targetTableName = "hoodie_trips_copy";
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = HoodieIncrSource.class.getName();
            config.operation = writeOperationType;
            config.sourceOrderingField = "timestamp";
            config.propsFilePath = TestHoodieDeltaStreamer.dfsBasePath + "/test-downstream-source.properties";
            config.sourceLimit = 1000L;
            if (null != str3) {
                config.schemaProviderClassName = str3;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + z);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.path=" + str);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
            config.configs = arrayList;
            return config;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertRecordCount(long j, String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            Assertions.assertEquals(j, sQLContext.read().format("org.apache.hudi").load(str).count());
        }

        static Map<String, Long> getPartitionRecordCount(String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            List collectAsList = sQLContext.read().format("org.apache.hudi").load(str).groupBy("_hoodie_partition_path", new String[0]).count().collectAsList();
            HashMap hashMap = new HashMap();
            collectAsList.stream().forEach(row -> {
            });
            return hashMap;
        }

        static void assertNoPartitionMatch(String str, SQLContext sQLContext, String str2) {
            sQLContext.clearCache();
            Assertions.assertEquals(0L, sQLContext.read().format("org.apache.hudi").load(str).filter("_hoodie_partition_path = " + str2).count());
        }

        static void assertDistinctRecordCount(long j, String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            Assertions.assertEquals(j, sQLContext.read().format("org.apache.hudi").load(str).select("_hoodie_record_key", new String[0]).distinct().count());
        }

        static List<Row> countsPerCommit(String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            return sQLContext.read().format("org.apache.hudi").load(str).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertDistanceCount(long j, String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            sQLContext.read().format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
            Assertions.assertEquals(j, sQLContext.sql("select * from tmp_trips where haversine_distance is not NULL").count());
        }

        static void assertDistanceCountWithExactValue(long j, String str, SQLContext sQLContext) {
            sQLContext.clearCache();
            sQLContext.read().format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
            Assertions.assertEquals(j, sQLContext.sql("select * from tmp_trips where haversine_distance = 1.0").count());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNCompactionCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNDeltaCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNCompactionCommitsAfterCommit(int i, String str, String str2, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitTimeline().findInstantsAfter(str).filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNDeltaCommitsAfterCommit(int i, String str, String str2, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build();
            HoodieTimeline filterCompletedInstants = build.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(str).filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        static String assertCommitMetadata(String str, String str2, FileSystem fileSystem, int i) throws IOException {
            HoodieTimeline filterCompletedInstants = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant hoodieInstant = (HoodieInstant) filterCompletedInstants.lastInstant().get();
            HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
            Assertions.assertEquals(i, filterCompletedInstants.countInstants());
            Assertions.assertEquals(str, hoodieCommitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            return hoodieInstant.getTimestamp();
        }

        static void waitTillCondition(Function<Boolean, Boolean> function, Future future, long j) throws Exception {
            Executors.newSingleThreadExecutor().submit(() -> {
                boolean z = false;
                while (!z && !future.isDone()) {
                    try {
                        Thread.sleep(3000L);
                        z = ((Boolean) function.apply(true)).booleanValue();
                    } catch (Throwable th) {
                        TestHoodieDeltaStreamer.LOG.warn("Got error :", th);
                        z = false;
                    }
                }
                return Boolean.valueOf(z);
            }).get(j, TimeUnit.SECONDS);
        }

        static void assertAtLeastNCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().filterCompletedInstants();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedInstants.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        static void assertAtLeastNReplaceCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline completedReplaceTimeline = build.getActiveTimeline().getCompletedReplaceTimeline();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) completedReplaceTimeline.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }

        static void assertPendingIndexCommit(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterPendingIndexTimeline = build.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterPendingIndexTimeline.getInstants().count();
            Assertions.assertEquals(1, count, "Got=" + count + ", exp=1");
        }

        static void assertCompletedIndexCommit(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterCompletedIndexTimeline = build.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterCompletedIndexTimeline.getInstants().count();
            Assertions.assertEquals(1, count, "Got=" + count + ", exp=1");
        }

        static void assertNoReplaceCommits(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline completedReplaceTimeline = build.getActiveTimeline().getCompletedReplaceTimeline();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) completedReplaceTimeline.getInstants().count();
            Assertions.assertEquals(0, count, "Got=" + count + ", exp =0");
        }

        static void assertAtLeastNReplaceRequests(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterPendingReplaceTimeline = build.getActiveTimeline().filterPendingReplaceTimeline();
            TestHoodieDeltaStreamer.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants().collect(Collectors.toList()));
            int count = (int) filterPendingReplaceTimeline.getInstants().count();
            Assertions.assertTrue(i <= count, "Got=" + count + ", exp >=" + i);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestIdentityTransformer.class */
    public static class TestIdentityTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset;
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestSpecificPartitionTransformer.class */
    public static class TestSpecificPartitionTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset.filter("partition_path == '2016/03/15'");
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TestTableLevelGenerator.class */
    public static class TestTableLevelGenerator extends SimpleKeyGenerator {
        public TestTableLevelGenerator(TypedProperties typedProperties) {
            super(typedProperties);
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TripsWithDistanceTransformer.class */
    public static class TripsWithDistanceTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            dataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
            return dataset.withColumn("haversine_distance", functions.callUDF("distance_udf", new Column[]{functions.col("begin_lat"), functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")}));
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer$TripsWithEvolvedOptionalFieldTransformer.class */
    public static class TripsWithEvolvedOptionalFieldTransformer implements Transformer {
        public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) {
            return dataset.withColumn("evoluted_optional_union_field", functions.col("rider"));
        }
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String str, int i, String str2) throws IOException {
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(i, "false", "", "", str2, ""));
        return new HoodieDeltaStreamer(makeConfig, this.jsc);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String str, String str2, Boolean bool, String str3) {
        return initialHoodieClusteringJob(str, str2, bool, str3, null);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String str, String str2, Boolean bool, String str3, Boolean bool2) {
        return new HoodieClusteringJob(this.jsc, buildHoodieClusteringUtilConfig(str, str2, bool, str3, bool2));
    }

    @AfterAll
    public static void cleanupClass() {
        UtilitiesTestBase.cleanupClass();
        if (testUtils != null) {
            testUtils.teardown();
        }
    }

    @Override // org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase, org.apache.hudi.utilities.testutils.UtilitiesTestBase
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
    }

    @Test
    public void testProps() {
        TypedProperties props = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/test-source.properties")).getProps();
        Assertions.assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assertions.assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
        Assertions.assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator", props.getString("hoodie.datasource.write.keygenerator.class"));
    }

    private static HoodieDeltaStreamer.Config getBaseConfig() {
        HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
        config.targetBasePath = "s3://mybucket/blah";
        config.tableType = "COPY_ON_WRITE";
        config.targetTableName = "test";
        return config;
    }

    private static Stream<Arguments> schemaEvolArgs() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true}), Arguments.of(new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true}), Arguments.of(new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false})});
    }

    private static Stream<Arguments> provideValidCliArgs() {
        HoodieDeltaStreamer.Config baseConfig = getBaseConfig();
        HoodieDeltaStreamer.Config baseConfig2 = getBaseConfig();
        baseConfig2.baseFileFormat = "PARQUET";
        HoodieDeltaStreamer.Config baseConfig3 = getBaseConfig();
        baseConfig3.sourceLimit = Long.parseLong("500");
        HoodieDeltaStreamer.Config baseConfig4 = getBaseConfig();
        baseConfig4.enableHiveSync = true;
        HoodieDeltaStreamer.Config baseConfig5 = getBaseConfig();
        baseConfig5.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table");
        HoodieDeltaStreamer.Config baseConfig6 = getBaseConfig();
        baseConfig6.configs = Arrays.asList("hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieDeltaStreamer.Config baseConfig7 = getBaseConfig();
        baseConfig7.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieDeltaStreamer.Config baseConfig8 = getBaseConfig();
        baseConfig8.baseFileFormat = "PARQUET";
        baseConfig8.sourceLimit = Long.parseLong("500");
        baseConfig8.enableHiveSync = true;
        baseConfig8.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test"}, baseConfig}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET"}, baseConfig2}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--source-limit", "500"}, baseConfig3}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--enable-hive-sync"}, baseConfig4}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table"}, baseConfig5}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig6}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig7}), Arguments.of(new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--source-limit", "500", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET", "--enable-hive-sync", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, baseConfig8})});
    }

    @MethodSource({"provideValidCliArgs"})
    @ParameterizedTest
    public void testValidCommandLineArgs(String[] strArr, HoodieDeltaStreamer.Config config) {
        Assertions.assertEquals(config, HoodieDeltaStreamer.getConfig(strArr));
    }

    @Test
    public void testKafkaConnectCheckpointProvider() throws IOException {
        String str = dfsBasePath + "/test_table";
        String str2 = dfsBasePath + "/kafka_topic1";
        String str3 = str2 + "/year=2016/month=05/day=01";
        String str4 = str3 + "/kafka_topic1+0+100+200.parquet";
        HoodieDeltaStreamer.Config makeDropAllConfig = TestHelpers.makeDropAllConfig(str, WriteOperationType.UPSERT);
        TypedProperties props = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/test-source.properties")).getProps();
        props.put("hoodie.deltastreamer.checkpoint.provider.path", str2);
        makeDropAllConfig.initialCheckpointProvider = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
        dfs.mkdirs(new Path(str2));
        dfs.mkdirs(new Path(str3));
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(new HoodieTestDataGenerator().generateInserts("000", 100)), new Path(str4));
        Assertions.assertEquals("kafka_topic1,0:200", new HoodieDeltaStreamer(makeDropAllConfig, this.jsc, dfs, hdfsTestService.getHadoopConf(), Option.ofNullable(props)).getConfig().checkpoint);
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() throws Exception {
        Exception exc = (Exception) Assertions.assertThrows(IOException.class, () -> {
            new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/test_table_invalid_key_gen", WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), "test-invalid.properties", false), this.jsc).sync();
        }, "Should error out when setting the key generator class property to an invalid value");
        LOG.debug("Expected error during getting the key generator", exc);
        Assertions.assertTrue(exc.getMessage().contains("Could not load key generator class"));
    }

    @Test
    public void testTableCreation() throws Exception {
        LOG.debug("Expected error during table creation", (Exception) Assertions.assertThrows(TableNotFoundException.class, () -> {
            dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
            new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", WriteOperationType.BULK_INSERT), this.jsc).sync();
        }, "Should error out when pointed out at a dir thats not a table"));
    }

    @Test
    public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
        String str = dfsBasePath + "/test_table";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1950L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        Assertions.assertEquals(1950L, TestHelpers.countsPerCommit(str, this.sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        String str2 = dfsBasePath + "/src_bootstrapped";
        Dataset load = this.sqlContext.read().format("org.apache.hudi").load(str);
        load.write().format("parquet").save(str2);
        String str3 = dfsBasePath + "/test_dataset_bootstrapped";
        makeConfig.runBootstrap = true;
        makeConfig.configs.add(String.format("hoodie.bootstrap.base.path=%s", str2));
        makeConfig.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
        makeConfig.configs.add("hoodie.bootstrap.parallelism=5");
        makeConfig.targetBasePath = str3;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        Dataset load2 = this.sqlContext.read().format("org.apache.hudi").load(str3);
        LOG.info("Schema :");
        load2.printSchema();
        TestHelpers.assertRecordCount(1950L, str3, this.sqlContext);
        load2.registerTempTable("bootstrapped");
        Assertions.assertEquals(1950L, this.sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
        StructField[] fields = load2.schema().fields();
        List asList = Arrays.asList(load2.schema().fieldNames());
        List asList2 = Arrays.asList(load.schema().fieldNames());
        Assertions.assertEquals(asList2.size(), fields.length);
        Assertions.assertTrue(asList.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
        Assertions.assertTrue(asList.containsAll(asList2));
    }

    @MethodSource({"schemaEvolArgs"})
    @ParameterizedTest
    public void testSchemaEvolution(String str, boolean z, boolean z2) throws Exception {
        String str2 = dfsBasePath + "/test_table_schema_evolution" + str + "_" + z + "_" + z2;
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str2, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        makeConfig.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
        makeConfig.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc");
        makeConfig.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!z2) {
            makeConfig.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str2, dfs, 1);
        HoodieDeltaStreamer.Config makeConfig2 = TestHelpers.makeConfig(str2, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        makeConfig2.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
        makeConfig2.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
        makeConfig2.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!z2) {
            makeConfig2.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        new HoodieDeltaStreamer(makeConfig2, this.jsc).sync();
        TestHelpers.assertRecordCount(1450L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str2, dfs, 2);
        Assertions.assertEquals(1450L, TestHelpers.countsPerCommit(str2, this.sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        this.sqlContext.read().format("org.apache.hudi").load(str2).createOrReplaceTempView("tmp_trips");
        Assertions.assertEquals(950L, this.sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count());
        if (!z) {
            defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName();
        }
        HoodieDeltaStreamer.Config makeConfig3 = TestHelpers.makeConfig(str2, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, str);
        makeConfig3.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
        if (z) {
            makeConfig3.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
        }
        if (!z2) {
            makeConfig3.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        makeConfig3.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(makeConfig3, this.jsc).sync();
        TestHelpers.assertRecordCount(1900L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata("00002", str2, dfs, 3);
        Assertions.assertEquals(1900L, TestHelpers.countsPerCommit(str2, this.sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        Schema tableAvroSchemaWithoutMetadataFields = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(str2).setConf(dfs.getConf()).build()).getTableAvroSchemaWithoutMetadataFields();
        Assertions.assertNotNull(tableAvroSchemaWithoutMetadataFields);
        Schema parse = new Schema.Parser().parse(dfs.open(new Path(dfsBasePath + "/source_evolved.avsc")));
        if (!z || z2) {
            parse = AvroConversionUtils.convertStructTypeToAvroSchema(AvroConversionUtils.convertAvroSchemaToStructType(parse), "hoodie_source", "hoodie.source");
        }
        Assertions.assertEquals(tableAvroSchemaWithoutMetadataFields, parse);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(makeConfig3.targetBasePath, this.jsc.hadoopConfiguration()), dfsBasePath + "/test-source.properties");
        writeCommonPropsToFile(dfs, dfsBasePath);
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    @Test
    public void testUpsertsCOWContinuousMode() throws Exception {
        testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
    }

    @Test
    public void testUpsertsMORContinuousMode() throws Exception {
        testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
    }

    private void testUpsertsContinuousMode(HoodieTableType hoodieTableType, String str) throws Exception {
        String str2 = dfsBasePath + "/" + str;
        int i = 3000;
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str2, WriteOperationType.UPSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = hoodieTableType.name();
        makeConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, 3000));
        makeConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
            if (hoodieTableType.equals(HoodieTableType.MERGE_ON_READ)) {
                TestHelpers.assertAtleastNDeltaCommits(5, str2, dfs);
                TestHelpers.assertAtleastNCompactionCommits(2, str2, dfs);
            } else {
                TestHelpers.assertAtleastNCompactionCommits(5, str2, dfs);
            }
            TestHelpers.assertRecordCount(i, str2, this.sqlContext);
            TestHelpers.assertDistanceCount(i, str2, this.sqlContext);
            return true;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, HoodieDeltaStreamer.Config config, Function<Boolean, Boolean> function) throws Exception {
        deltaStreamerTestRunner(hoodieDeltaStreamer, config, function, "single_ds_job");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, HoodieDeltaStreamer.Config config, Function<Boolean, Boolean> function, String str) throws Exception {
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            try {
                hoodieDeltaStreamer.sync();
            } catch (Exception e) {
                LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + str);
                throw new RuntimeException(e.getMessage(), e);
            }
        });
        TestHelpers.waitTillCondition(function, submit, 360L);
        hoodieDeltaStreamer.shutdownGracefully();
        submit.get();
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer hoodieDeltaStreamer, Function<Boolean, Boolean> function) throws Exception {
        deltaStreamerTestRunner(hoodieDeltaStreamer, null, function);
    }

    @ValueSource(strings = {"true", "false"})
    @ParameterizedTest
    public void testInlineClustering(String str) throws Exception {
        String str2 = dfsBasePath + "/inlineClustering";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str2, WriteOperationType.UPSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "true", "2", "", "", str));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
            TestHelpers.assertAtLeastNCommits(2, str2, dfs);
            TestHelpers.assertAtLeastNReplaceCommits(1, str2, dfs);
            return true;
        });
    }

    @Test
    public void testDeltaSyncWithPendingClustering() throws Exception {
        String str = dfsBasePath + "/inlineClusteringPending";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = false;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertAtLeastNCommits(1, str, dfs);
        initialHoodieClusteringJob(str, null, false, "schedule").cluster(0);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(str).build();
        HoodieInstant hoodieInstant = (HoodieInstant) ((List) build.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList())).get(0);
        build.getActiveTimeline().transitionReplaceRequestedToInflight(hoodieInstant, Option.empty());
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "true", "2", "", ""));
        makeConfig.retryLastPendingInlineClusteringJob = true;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        Assertions.assertEquals(hoodieInstant.getTimestamp(), ((HoodieInstant) build.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp());
        TestHelpers.assertAtLeastNCommits(2, str, dfs);
        TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCleanerDeleteReplacedDataWithArchive(Boolean bool) throws Exception {
        String str = dfsBasePath + "/cleanerDeleteReplacedDataWithArchive" + bool;
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "true", "2", "", ""));
        makeConfig.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
        makeConfig.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool2 -> {
            TestHelpers.assertAtLeastNReplaceCommits(2, str, dfs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(6, str, dfs);
        TestHelpers.assertAtLeastNReplaceCommits(2, str, dfs);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(str).build();
        HoodieTimeline completedReplaceTimeline = build.reloadActiveTimeline().getCompletedReplaceTimeline();
        Option nthFromLastInstant = completedReplaceTimeline.nthFromLastInstant(1);
        Assertions.assertTrue(nthFromLastInstant.isPresent());
        String str2 = null;
        List list = null;
        for (Map.Entry entry : ((HoodieReplaceCommitMetadata) HoodieReplaceCommitMetadata.fromBytes((byte[]) completedReplaceTimeline.getInstantDetails((HoodieInstant) nthFromLastInstant.get()).get(), HoodieReplaceCommitMetadata.class)).getPartitionToReplaceFileIds().entrySet()) {
            str2 = String.valueOf(entry.getKey());
            list = (List) entry.getValue();
        }
        Assertions.assertNotNull(str2);
        Assertions.assertNotNull(list);
        ArrayList arrayList = new ArrayList();
        RemoteIterator listFiles = build.getFs().listFiles(new Path(build.getBasePath(), str2), true);
        while (listFiles.hasNext()) {
            String uri = ((LocatedFileStatus) listFiles.next()).getPath().toUri().toString();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (uri.contains(String.valueOf(it.next()))) {
                    arrayList.add(uri);
                }
            }
        }
        Assertions.assertFalse(arrayList.isEmpty());
        List<String> asyncServicesConfigs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), bool));
        asyncServicesConfigs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        if (bool.booleanValue()) {
            asyncServicesConfigs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
            asyncServicesConfigs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
            asyncServicesConfigs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
        }
        makeConfig.configs = asyncServicesConfigs;
        makeConfig.continuousMode = false;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        Assertions.assertEquals(0L, build.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(hoodieInstant -> {
            return ((HoodieInstant) nthFromLastInstant.get()).equals(hoodieInstant);
        }).count());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assertions.assertFalse(build.getFs().exists(new Path((String) it2.next())));
        }
    }

    private List<String> getAsyncServicesConfigs(int i, String str, String str2, String str3, String str4, String str5, String str6) {
        List<String> asyncServicesConfigs = getAsyncServicesConfigs(i, str, str2, str3, str4, str5);
        asyncServicesConfigs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), str6));
        return asyncServicesConfigs;
    }

    private List<String> getAsyncServicesConfigs(int i, String str, String str2, String str3, String str4, String str5) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, Integer.valueOf(i)));
        if (!StringUtils.isNullOrEmpty(str)) {
            arrayList.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN.key(), str));
        }
        if (!StringUtils.isNullOrEmpty(str2)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), str2));
        }
        if (!StringUtils.isNullOrEmpty(str3)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), str3));
        }
        if (!StringUtils.isNullOrEmpty(str4)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), str4));
        }
        if (!StringUtils.isNullOrEmpty(str5)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), str5));
        }
        return arrayList;
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String str, String str2, Boolean bool) {
        return buildHoodieClusteringUtilConfig(str, str2, bool, null, null);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String str, String str2, Boolean bool, String str3, Boolean bool2) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = str;
        config.clusteringInstantTime = str2;
        config.runSchedule = bool;
        config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
        config.runningMode = str3;
        if (bool2 != null) {
            config.retryLastFailedClusteringJob = bool2;
        }
        return config;
    }

    private HoodieIndexer.Config buildIndexerConfig(String str, String str2, String str3, String str4, String str5) {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        config.basePath = str;
        config.tableName = str2;
        config.indexInstantTime = str3;
        config.propsFilePath = dfsBasePath + "/indexer.properties";
        config.runningMode = str4;
        config.indexTypes = str5;
        return config;
    }

    @Test
    public void testHoodieIndexer() throws Exception {
        String str = dfsBasePath + "/asyncindexer";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str, 1000, "false");
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            TestHelpers.assertAtLeastNCommits(2, str, dfs);
            Option.empty();
            try {
                Option doSchedule = new HoodieIndexer(this.jsc, buildIndexerConfig(str, initialHoodieDeltaStreamer.getConfig().targetTableName, null, "schedule", "COLUMN_STATS")).doSchedule();
                if (doSchedule.isPresent()) {
                    TestHelpers.assertPendingIndexCommit(str, dfs);
                    LOG.info("Schedule indexing success, now build index with instant time " + ((String) doSchedule.get()));
                    new HoodieIndexer(this.jsc, buildIndexerConfig(str, initialHoodieDeltaStreamer.getConfig().targetTableName, (String) doSchedule.get(), "execute", "COLUMN_STATS")).start(0);
                    LOG.info("Metadata indexing success");
                    TestHelpers.assertCompletedIndexCommit(str, dfs);
                } else {
                    LOG.warn("Metadata indexing failed");
                }
                return true;
            } catch (Exception e) {
                LOG.info("Schedule indexing failed", e);
                return false;
            }
        });
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHoodieAsyncClusteringJob(boolean z) throws Exception {
        String str = dfsBasePath + "/asyncClusteringJob";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str, 3000, "false");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            TestHelpers.assertAtLeastNCommits(2, str, dfs);
            countDownLatch.countDown();
            return true;
        });
        if (!countDownLatch.await(2L, TimeUnit.MINUTES)) {
            Assertions.fail("Deltastreamer should have completed 2 commits.");
            return;
        }
        Option empty = Option.empty();
        try {
            empty = initialHoodieClusteringJob(str, null, true, null).doSchedule();
        } catch (Exception e) {
            LOG.warn("Schedule clustering failed", e);
            Assertions.fail("Schedule clustering failed", e);
        }
        if (!empty.isPresent()) {
            LOG.warn("Clustering execution failed");
            Assertions.fail("Clustering execution failed");
            return;
        }
        LOG.info("Schedule clustering success, now cluster with instant time " + ((String) empty.get()));
        HoodieClusteringJob.Config buildHoodieClusteringUtilConfig = buildHoodieClusteringUtilConfig(str, z ? (String) empty.get() : null, false);
        new HoodieClusteringJob(this.jsc, buildHoodieClusteringUtilConfig).cluster(buildHoodieClusteringUtilConfig.retry);
        TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
        LOG.info("Cluster success");
    }

    @Test
    public void testAsyncClusteringService() throws Exception {
        String str = dfsBasePath + "/asyncClustering";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "3"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
            TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, str, dfs);
        TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
        TestHelpers.assertDistinctRecordCount(2000, str, this.sqlContext);
    }

    @Test
    public void testAsyncClusteringServiceWithConflicts() throws Exception {
        String str = dfsBasePath + "/asyncClusteringWithConflicts";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.UPSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "3"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
            TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, str, dfs);
        TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
        TestHelpers.assertDistinctRecordCount(1900L, str, this.sqlContext);
    }

    @Test
    public void testAsyncClusteringServiceWithCompaction() throws Exception {
        String str = dfsBasePath + "/asyncClusteringCompaction";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = true;
        makeConfig.tableType = HoodieTableType.MERGE_ON_READ.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(2000, "false", "", "", "true", "3"));
        deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
            TestHelpers.assertAtleastNCompactionCommits(2, str, dfs);
            TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, str, dfs);
        TestHelpers.assertAtLeastNReplaceCommits(1, str, dfs);
        TestHelpers.assertDistinctRecordCount(2000, str, this.sqlContext);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAsyncClusteringJobWithRetry(boolean z) throws Exception {
        String str = dfsBasePath + "/asyncClustering3";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT);
        makeConfig.continuousMode = false;
        makeConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
        makeConfig.configs.addAll(getAsyncServicesConfigs(3000, "false", "false", "0", "false", "0"));
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertAtLeastNCommits(1, str, dfs);
        initialHoodieClusteringJob(str, null, false, "schedule").cluster(0);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(str).build();
        HoodieInstant hoodieInstant = (HoodieInstant) ((List) build.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList())).get(0);
        build.getActiveTimeline().transitionReplaceRequestedToInflight(hoodieInstant, Option.empty());
        initialHoodieClusteringJob(str, null, false, "scheduleAndExecute", Boolean.valueOf(z)).cluster(0);
        String timestamp = ((HoodieInstant) build.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp();
        if (z) {
            Assertions.assertEquals(hoodieInstant.getTimestamp(), timestamp);
        } else {
            Assertions.assertFalse(hoodieInstant.getTimestamp().equalsIgnoreCase(timestamp));
        }
    }

    @ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
    @ParameterizedTest
    public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String str) throws Exception {
        String str2 = dfsBasePath + "/asyncClustering2";
        HoodieDeltaStreamer initialHoodieDeltaStreamer = initialHoodieDeltaStreamer(str2, 3000, "false");
        HoodieClusteringJob initialHoodieClusteringJob = initialHoodieClusteringJob(str2, null, true, str);
        deltaStreamerTestRunner(initialHoodieDeltaStreamer, bool -> {
            TestHelpers.assertAtLeastNCommits(2, str2, dfs);
            try {
                if (initialHoodieClusteringJob.cluster(0) == 0) {
                    LOG.info("Cluster success");
                } else {
                    LOG.warn("Import failed");
                    if (!str.toLowerCase().equals("execute")) {
                        return false;
                    }
                }
            } catch (Exception e) {
                LOG.warn("ScheduleAndExecute clustering failed", e);
                if (!str.equalsIgnoreCase("execute")) {
                    return false;
                }
            }
            String lowerCase = str.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1319569547:
                    if (lowerCase.equals("execute")) {
                        z = 2;
                        break;
                    }
                    break;
                case -697920873:
                    if (lowerCase.equals("schedule")) {
                        z = true;
                        break;
                    }
                    break;
                case 1514639253:
                    if (lowerCase.equals("scheduleandexecute")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case AbstractBaseTestSource.DEFAULT_PARTITION_NUM /* 0 */:
                    TestHelpers.assertAtLeastNReplaceCommits(2, str2, dfs);
                    return true;
                case true:
                    TestHelpers.assertAtLeastNReplaceRequests(2, str2, dfs);
                    TestHelpers.assertNoReplaceCommits(str2, dfs);
                    return true;
                case true:
                    TestHelpers.assertNoReplaceCommits(str2, dfs);
                    return true;
                default:
                    throw new IllegalStateException("Unexpected value: " + str);
            }
        });
    }

    @Test
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
        String str = dfsBasePath + "/test_table2";
        String str2 = dfsBasePath + "/test_downstream_table2";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true);
        makeConfig.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str, this.sqlContext);
        String assertCommitMetadata = TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        new HoodieDeltaStreamer(TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, null), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, dfs, 1);
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        new HoodieDeltaStreamer(TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName()), this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata, str2, dfs, 1);
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1950L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, str, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1950L, str, this.sqlContext);
        String assertCommitMetadata2 = TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        Assertions.assertEquals(1950L, TestHelpers.countsPerCommit(str, this.sqlContext).stream().mapToLong(row -> {
            return row.getLong(1);
        }).sum());
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc = TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.UPSERT, false, null);
        makeConfigForHudiIncrSrc.sourceLimit = 2000L;
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCount(2000L, str2, this.sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(2000L, str2, this.sqlContext);
        TestHelpers.assertCommitMetadata(assertCommitMetadata2, str2, dfs, 2);
        Assertions.assertEquals(2000L, TestHelpers.countsPerCommit(str2, this.sqlContext).stream().mapToLong(row2 -> {
            return row2.getLong(1);
        }).sum());
        HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(str, "hive_trips");
        hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList(new String[]{TestHoodieSnapshotExporter.UserDefinedPartitioner.PARTITION_NAME, "month", "day"});
        HoodieHiveClient hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
        Assertions.assertTrue(hoodieHiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
        Assertions.assertEquals(3, hoodieHiveClient.getAllPartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote");
        Assertions.assertEquals(assertCommitMetadata2, hoodieHiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was synced should be updated in the TBLPROPERTIES");
    }

    @Test
    public void testNullSchemaProvider() throws Exception {
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(dfsBasePath + "/test_table", WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true, false, false, null, null);
        Exception exc = (Exception) Assertions.assertThrows(HoodieException.class, () -> {
            new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf()).sync();
        }, "Should error out when schema provider is not provided");
        LOG.debug("Expected error during reading data from source ", exc);
        Assertions.assertTrue(exc.getMessage().contains("Please provide a valid schema provider class!"));
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String str = dfsBasePath + "/test_dataset_mor";
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, "MERGE_ON_READ"), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, this.jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assertions.assertEquals(new HoodieConfig(properties).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), DummyAvroPayload.class.getName());
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String str = dfsBasePath + "/test_dataset_cow";
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, null), this.jsc, dfs, hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(makeConfig, this.jsc, dfs, hiveServer.getHiveConf());
        Properties properties = new Properties();
        FSDataInputStream open = FSUtils.getFs(makeConfig.targetBasePath, this.jsc.hadoopConfiguration()).open(new Path(str + "/.hoodie/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                properties.load((InputStream) open);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                Assertions.assertFalse(properties.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFilterDupes() throws Exception {
        String str = dfsBasePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.filterDupes = true;
        makeConfig.sourceLimit = 2000L;
        makeConfig.operation = WriteOperationType.INSERT;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(2000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
        List<Row> countsPerCommit = TestHelpers.countsPerCommit(str, this.sqlContext);
        Assertions.assertEquals(1000L, countsPerCommit.get(0).getLong(1));
        Assertions.assertEquals(1000L, countsPerCommit.get(1).getLong(1));
        HoodieInstant hoodieInstant = (HoodieInstant) HoodieTableMetaClient.builder().setConf(this.jsc.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config makeDropAllConfig = TestHelpers.makeDropAllConfig(str, WriteOperationType.UPSERT);
        makeDropAllConfig.filterDupes = false;
        makeDropAllConfig.sourceLimit = 2000L;
        makeDropAllConfig.operation = WriteOperationType.UPSERT;
        makeDropAllConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
        new HoodieDeltaStreamer(makeDropAllConfig, this.jsc).sync();
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.jsc.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
        HoodieInstant hoodieInstant2 = (HoodieInstant) build.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assertions.assertTrue(HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.GREATER_THAN, hoodieInstant.getTimestamp()));
        HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) build.getActiveTimeline().getInstantDetails(hoodieInstant2).get(), HoodieCommitMetadata.class);
        System.out.println("New Commit Metadata=" + hoodieCommitMetadata);
        Assertions.assertTrue(hoodieCommitMetadata.getPartitionToWriteStats().isEmpty());
        makeDropAllConfig.filterDupes = true;
        makeDropAllConfig.operation = WriteOperationType.UPSERT;
        try {
            new HoodieDeltaStreamer(makeDropAllConfig, this.jsc).sync();
        } catch (IllegalArgumentException e) {
            Assertions.assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
        }
    }

    @Test
    public void testDistributedTestDataSource() {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty(SourceConfigs.MAX_UNIQUE_RECORDS_PROP, "1000");
        typedProperties.setProperty(SourceConfigs.NUM_SOURCE_PARTITIONS_PROP, "1");
        typedProperties.setProperty(SourceConfigs.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
        InputBatch fetchNext = new DistributedTestDataSource(typedProperties, this.jsc, this.sparkSession, null).fetchNext(Option.empty(), 10000000L);
        ((JavaRDD) fetchNext.getBatch().get()).cache();
        Assertions.assertEquals(1000L, ((JavaRDD) fetchNext.getBatch().get()).count());
    }

    private static void prepareJsonKafkaDFSFiles(int i, boolean z, String str) {
        if (z) {
            try {
                testUtils.createTopic(str, 2);
            } catch (TopicExistsException e) {
            }
        }
        testUtils.sendMessages(str, UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInsertsAsPerSchema("000", Integer.valueOf(i), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
    }

    private void prepareParquetDFSSource(boolean z, boolean z2, String str) throws IOException {
        prepareParquetDFSSource(z, z2, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", str);
    }

    private void prepareParquetDFSSource(boolean z, boolean z2) throws IOException {
        prepareParquetDFSSource(z, z2, "");
    }

    private void prepareParquetDFSSource(boolean z, boolean z2, String str, String str2, String str3, String str4, boolean z3, String str5) throws IOException {
        prepareParquetDFSSource(z, z2, str, str2, str3, str4, z3, str5, "");
    }

    private void prepareParquetDFSSource(boolean z, boolean z2, String str, String str2, String str3, String str4, boolean z3, String str5, String str6) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        if (z3) {
            populateCommonProps(typedProperties, dfsBasePath);
        }
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", str5);
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + str);
            if (z2) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + str2);
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", str4);
        if (!StringUtils.isNullOrEmpty(str6)) {
            typedProperties.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, str6);
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/" + str3);
    }

    private void testParquetDFSSource(boolean z, List<String> list) throws Exception {
        testParquetDFSSource(z, list, false);
    }

    private void testParquetDFSSource(boolean z, List<String> list, boolean z2) throws Exception {
        PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
        boolean z3 = (list == null || list.isEmpty()) ? false : true;
        prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        prepareParquetDFSSource(z, z3, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", z2 ? "1" : "");
        String str = dfsBasePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.INSERT, z2 ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), list, "test-parquet-dfs-source.properties", false, z, 100000, false, null, null, "timestamp", null), this.jsc);
        hoodieDeltaStreamer.sync();
        TestHelpers.assertRecordCount(10, str, this.sqlContext);
        if (z2) {
            prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
            hoodieDeltaStreamer.sync();
            TestHelpers.assertRecordCount(10, str, this.sqlContext);
            Assertions.assertNotEquals(new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(str).setConf(this.jsc.hadoopConfiguration()).build()).getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString());
        }
        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null);
        hoodieDeltaStreamer.sync();
        TestHelpers.assertRecordCount(10 + 100, str, this.sqlContext);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setBasePath(str).setConf(this.jsc.hadoopConfiguration()).build();
        build.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(hoodieInstant -> {
            assertValidSchemaInCommitMetadata(hoodieInstant, build);
        });
        testNum++;
    }

    private void assertValidSchemaInCommitMetadata(HoodieInstant hoodieInstant, HoodieTableMetaClient hoodieTableMetaClient) {
        try {
            Assertions.assertFalse(StringUtils.isNullOrEmpty(((HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) hoodieTableMetaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class)).getMetadata("schema")));
        } catch (IOException e) {
            throw new HoodieException("Failed to parse commit metadata for " + hoodieInstant.toString());
        }
    }

    private void testORCDFSSource(boolean z, List<String> list) throws Exception {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
            if (list != null) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/test-orc-dfs-source.properties");
        String str = dfsBasePath + "/test_orc_source_table" + testNum;
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.INSERT, ORCDFSSource.class.getName(), list, "test-orc-dfs-source.properties", false, z, 100000, false, null, null, "timestamp", null), this.jsc).sync();
        TestHelpers.assertRecordCount(5L, str, this.sqlContext);
        testNum++;
    }

    private void prepareJsonKafkaDFSSource(String str, String str2, String str3) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        populateAllCommonProps(typedProperties, dfsBasePath, testUtils.brokerAddress());
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "");
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.topic", str3);
        typedProperties.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", this.kafkaCheckpointType);
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
        typedProperties.setProperty("auto.offset.reset", str2);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/" + str);
    }

    private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean z) throws Exception {
        PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
        prepareParquetDFSFiles(10, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "");
        String str = dfsBasePath + "/test_dfs_to_kafka" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), this.jsc);
        hoodieDeltaStreamer.sync();
        TestHelpers.assertRecordCount(10, str, this.sqlContext);
        hoodieDeltaStreamer.shutdownGracefully();
        topicName = "topic" + testNum;
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", z ? "latest" : "earliest", topicName);
        HoodieDeltaStreamer hoodieDeltaStreamer2 = new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), this.jsc);
        hoodieDeltaStreamer2.sync();
        TestHelpers.assertRecordCount(10 + (z ? 0 : 5), str, this.sqlContext);
        prepareJsonKafkaDFSFiles(20, false, topicName);
        hoodieDeltaStreamer2.sync();
        TestHelpers.assertRecordCount(r0 + 20, str, this.sqlContext);
        testNum++;
    }

    @Test
    public void testJsonKafkaDFSSource() throws Exception {
        topicName = "topic" + testNum;
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String str = dfsBasePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer hoodieDeltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), this.jsc);
        hoodieDeltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, str, this.sqlContext);
        prepareJsonKafkaDFSFiles(10, false, topicName);
        hoodieDeltaStreamer.sync();
        TestHelpers.assertRecordCount(5 + 10, str, this.sqlContext);
    }

    @Test
    public void testKafkaTimestampType() throws Exception {
        topicName = "topic" + testNum;
        this.kafkaCheckpointType = "timestamp";
        prepareJsonKafkaDFSFiles(5, true, topicName);
        prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String str = dfsBasePath + "/test_json_kafka_table" + testNum;
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), this.jsc).sync();
        TestHelpers.assertRecordCount(5L, str, this.sqlContext);
        prepareJsonKafkaDFSFiles(5, false, topicName);
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), this.jsc).sync();
        TestHelpers.assertRecordCount(10L, str, this.sqlContext);
    }

    @Test
    public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
        testDeltaStreamerTransitionFromParquetToKafkaSource(false);
    }

    @Test
    public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
        testDeltaStreamerTransitionFromParquetToKafkaSource(true);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceForEmptyBatch() throws Exception {
        testParquetDFSSource(false, null, true);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        testORCDFSSource(false, null);
    }

    @Test
    public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
        testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareCsvDFSSource(boolean z, char c, boolean z2, boolean z3) throws IOException {
        String str = dfsBasePath + "/csvFiles";
        String str2 = (z || z2) ? "_row_key" : "_c0";
        String str3 = (z || z2) ? "partition_path" : "";
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", str2);
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", str3);
        if (z2) {
            typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc");
            if (z3) {
                typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc");
            }
        }
        typedProperties.setProperty("hoodie.deltastreamer.source.dfs.root", str);
        if (c != ',') {
            if (c == '\t') {
                typedProperties.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
            } else {
                typedProperties.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(c));
            }
        }
        if (z) {
            typedProperties.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(z));
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/test-csv-dfs-source.properties");
        UtilitiesTestBase.Helpers.saveCsvToDFS(z, c, UtilitiesTestBase.Helpers.jsonifyRecords(new HoodieTestDataGenerator().generateInserts("000", 3, true)), dfs, str + "/1.csv");
    }

    private void testCsvDFSSource(boolean z, char c, boolean z2, List<String> list) throws Exception {
        prepareCsvDFSSource(z, c, z2, list != null);
        String str = dfsBasePath + "/test_csv_table" + testNum;
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.INSERT, CsvDFSSource.class.getName(), list, "test-csv-dfs-source.properties", false, z2, 1000, false, null, null, (z || z2) ? "timestamp" : "_c0", null), this.jsc).sync();
        TestHelpers.assertRecordCount(3L, str, this.sqlContext);
        testNum++;
    }

    @Test
    public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, ',', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(true, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
        testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
        testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(false, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
        testCsvDFSSource(false, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
        Exception exc = (Exception) Assertions.assertThrows(AnalysisException.class, () -> {
            testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }, "Should error out when doing the transformation.");
        LOG.debug("Expected error during transformation", exc);
        Assertions.assertTrue(exc.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
        testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareSqlSource() throws IOException {
        String str = dfsBasePath + "sqlSourceFiles";
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.source.sql.sql.query", "select * from test_sql_table");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/test-sql-source-source.properties");
        generateSqlSourceTestTable(str, "1", "1000", 1000, new HoodieTestDataGenerator());
    }

    private void generateSqlSourceTestTable(String str, String str2, String str3, int i, HoodieTestDataGenerator hoodieTestDataGenerator) throws IOException {
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInserts(str3, Integer.valueOf(i), false)), new Path(str, str2));
        this.sparkSession.read().parquet(str).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceSource() throws Exception {
        prepareSqlSource();
        StringBuilder append = new StringBuilder().append(dfsBasePath).append("/test_sql_source_table");
        int i = testNum;
        testNum = i + 1;
        String sb = append.append(i).toString();
        new HoodieDeltaStreamer(TestHelpers.makeConfig(sb, WriteOperationType.INSERT, SqlSource.class.getName(), Collections.emptyList(), "test-sql-source-source.properties", false, false, 1000, false, null, null, "timestamp", null, true), this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, sb, this.sqlContext);
    }

    @Disabled
    @Test
    public void testJdbcSourceIncrementalFetchInContinuousMode() {
        try {
            Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
            Throwable th = null;
            try {
                TypedProperties typedProperties = new TypedProperties();
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.user", "test");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
                typedProperties.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
                typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", SimpleKeyGenerator.class.getName());
                typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "ID");
                typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
                UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, dfs, dfsBasePath + "/test-jdbc-source.properties");
                int i = 1000;
                int i2 = 100;
                String str = dfsBasePath + "/triprec";
                HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.INSERT, JdbcSource.class.getName(), null, "test-jdbc-source.properties", false, false, 100, false, null, null, "timestamp", null);
                makeConfig.continuousMode = true;
                JdbcTestUtils.clearAndInsert("000", 1000, connection, new HoodieTestDataGenerator(), typedProperties);
                deltaStreamerTestRunner(new HoodieDeltaStreamer(makeConfig, this.jsc), makeConfig, bool -> {
                    TestHelpers.assertAtleastNCompactionCommits((i / i2) + (i % i2 == 0 ? 0 : 1), str, dfs);
                    TestHelpers.assertRecordCount(i, str, this.sqlContext);
                    return true;
                });
                if (connection != null) {
                    if (0 != 0) {
                        try {
                            connection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        connection.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    public void testHoodieIncrFallback() throws Exception {
        String str = dfsBasePath + "/incr_test_table";
        String str2 = dfsBasePath + "/incr_test_downstream_table";
        insertInTable(str, 1, WriteOperationType.BULK_INSERT);
        HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc = TestHelpers.makeConfigForHudiIncrSrc(str, str2, WriteOperationType.BULK_INSERT, true, null);
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        insertInTable(str, 9, WriteOperationType.UPSERT);
        Assertions.assertThrows(AnalysisException.class, () -> {
            new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        });
        TestHelpers.assertRecordCount(1000L, str2, this.sqlContext);
        if (makeConfigForHudiIncrSrc.configs == null) {
            makeConfigForHudiIncrSrc.configs = new ArrayList();
        }
        makeConfigForHudiIncrSrc.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
        makeConfigForHudiIncrSrc.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
        makeConfigForHudiIncrSrc.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        new HoodieDeltaStreamer(makeConfigForHudiIncrSrc, this.jsc).sync();
        Assertions.assertEquals(this.sqlContext.read().format("org.apache.hudi").load(str).count(), this.sqlContext.read().format("org.apache.hudi").load(str2).count());
    }

    private void insertInTable(String str, int i, WriteOperationType writeOperationType) throws Exception {
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, writeOperationType, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false);
        if (makeConfig.configs == null) {
            makeConfig.configs = new ArrayList();
        }
        makeConfig.configs.add("hoodie.cleaner.commits.retained=3");
        makeConfig.configs.add("hoodie.keep.min.commits=4");
        makeConfig.configs.add("hoodie.keep.max.commits=5");
        makeConfig.configs.add("hoodie.test.source.generate.inserts=true");
        for (int i2 = 0; i2 < i; i2++) {
            new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        }
    }

    @Test
    public void testInsertOverwrite() throws Exception {
        testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE);
    }

    @Test
    public void testInsertOverwriteTable() throws Exception {
        testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
    }

    @Disabled("Local run passing; flaky in CI environment.")
    @Test
    public void testDeletePartitions() throws Exception {
        prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "");
        String str = dfsBasePath + "/test_parquet_table" + testNum;
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), this.jsc).sync();
        TestHelpers.assertRecordCount(5L, str, this.sqlContext);
        testNum++;
        prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        prepareParquetDFSSource(false, false);
        new HoodieDeltaStreamer(TestHelpers.makeConfig(str, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), this.jsc).sync();
        TestHelpers.assertNoPartitionMatch(str, this.sqlContext, "2016/03/15");
    }

    void testDeltaStreamerWithSpecifiedOperation(String str, WriteOperationType writeOperationType) throws Exception {
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(str, WriteOperationType.BULK_INSERT);
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.operation = writeOperationType;
        makeConfig.sourceLimit = 0L;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1000L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1000L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00000", str, dfs, 1);
        makeConfig.sourceLimit = 1000L;
        new HoodieDeltaStreamer(makeConfig, this.jsc).sync();
        TestHelpers.assertRecordCount(1950L, str, this.sqlContext);
        TestHelpers.assertDistanceCount(1950L, str, this.sqlContext);
        TestHelpers.assertCommitMetadata("00001", str, dfs, 2);
    }

    @Test
    public void testFetchingCheckpointFromPreviousCommits() throws IOException {
        HoodieDeltaStreamer.Config makeConfig = TestHelpers.makeConfig(dfsBasePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "pp");
        TestDeltaSync testDeltaSync = new TestDeltaSync(makeConfig, this.sparkSession, null, typedProperties, this.jsc, dfs, this.jsc.hadoopConfiguration(), null);
        typedProperties.put(HoodieTableConfig.NAME.key(), "sample_tbl");
        HoodieTableMetaClient init = HoodieTestUtils.init(this.jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.COPY_ON_WRITE, typedProperties);
        HashMap hashMap = new HashMap();
        hashMap.put("deltastreamer.checkpoint.key", "abc");
        addCommitToTimeline(init, hashMap);
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "abc");
        hashMap.put("deltastreamer.checkpoint.key", "def");
        addCommitToTimeline(init, hashMap);
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "def");
        addReplaceCommitToTimeline(init, Collections.emptyMap());
        init.reloadActiveTimeline();
        Assertions.assertEquals(((HoodieCommitMetadata) testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(init.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), "def");
    }

    private static Stream<Arguments> testORCDFSSource() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{false, null}), Arguments.arguments(new Object[]{true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())})});
    }
}
