package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.testutils.HiveTestService;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.class */
public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
    static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
    static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
    static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties";
    static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
    static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties";
    static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
    static final String PROPS_FILENAME_INFER_COMPLEX_KEYGEN = "test-infer-complex-keygen.properties";
    static final String PROPS_FILENAME_INFER_NONPARTITIONED_KEYGEN = "test-infer-nonpartitioned-keygen.properties";
    static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_ORC = "test-orc-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_AVRO_KAFKA = "test-avro-kafka-dfs-source.properties";
    static final String PROPS_FILENAME_TEST_SQL_SOURCE = "test-sql-source-source.properties";
    static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
    static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
    static final String FIRST_ORC_FILE_NAME = "1.orc";
    static String PARQUET_SOURCE_ROOT;
    static String ORC_SOURCE_ROOT;
    static String JSON_KAFKA_SOURCE_ROOT;
    static final int PARQUET_NUM_RECORDS = 5;
    static final int ORC_NUM_RECORDS = 5;
    static final int CSV_NUM_RECORDS = 3;
    static final int JSON_KAFKA_NUM_RECORDS = 5;
    static final int SQL_SOURCE_NUM_RECORDS = 1000;
    static final String TGT_BASE_PATH_PARAM = "--target-base-path";
    static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
    static final String TABLE_TYPE_PARAM = "--table-type";
    static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
    static final String TARGET_TABLE_PARAM = "--target-table";
    static final String TARGET_TABLE_VALUE = "test";
    static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
    static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
    static final String SOURCE_LIMIT_PARAM = "--source-limit";
    static final String SOURCE_LIMIT_VALUE = "500";
    static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
    static final String HOODIE_CONF_PARAM = "--hoodie-conf";
    static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
    static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
    public static KafkaTestUtils testUtils;
    protected static String topicName;
    private static final Logger LOG = LoggerFactory.getLogger(HoodieDeltaStreamerTestBase.class);
    static final Random RANDOM = new Random();
    protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    protected static int testNum = 1;
    String kafkaCheckpointType = "string";
    Map<String, String> hudiOpts = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase$TestHelpers.class */
    public static class TestHelpers {
        TestHelpers() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeDropAllConfig(String str, WriteOperationType writeOperationType) {
            return makeConfig(str, writeOperationType, Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName()));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType) {
            return makeConfig(str, writeOperationType, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list) {
            return makeConfig(str, writeOperationType, list, HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_SOURCE, false);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list, String str2, boolean z) {
            return makeConfig(str, writeOperationType, list, str2, z, true, false, null, null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, List<String> list, String str2, boolean z, boolean z2, boolean z3, String str3, String str4) {
            return makeConfig(str, writeOperationType, TestDataSource.class.getName(), list, str2, z, z2, HoodieDeltaStreamerTestBase.SQL_SOURCE_NUM_RECORDS, z3, str3, str4, "timestamp", null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, String str2, List<String> list, String str3, boolean z, boolean z2, int i, boolean z3, String str4, String str5, String str6, String str7) {
            return makeConfig(str, writeOperationType, str2, list, str3, z, z2, i, z3, str4, str5, str6, str7, false);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfig(String str, WriteOperationType writeOperationType, String str2, List<String> list, String str3, boolean z, boolean z2, int i, boolean z3, String str4, String str5, String str6, String str7, boolean z4) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str;
            config.targetTableName = "hoodie_trips";
            config.tableType = str5 == null ? HoodieDeltaStreamerTestBase.TABLE_TYPE_VALUE : str5;
            config.sourceClassName = str2;
            config.transformerClassNames = list;
            config.operation = writeOperationType;
            config.enableHiveSync = Boolean.valueOf(z);
            config.sourceOrderingField = str6;
            config.propsFilePath = HoodieDeltaStreamerTestBase.basePath + "/" + str3;
            config.sourceLimit = i;
            config.checkpoint = str7;
            if (z3) {
                config.payloadClassName = str4;
            }
            if (z2) {
                config.schemaProviderClassName = HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
            }
            config.allowCommitOnNoCheckpointChange = Boolean.valueOf(z4);
            return config;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String str, String str2, WriteOperationType writeOperationType, boolean z, String str3) {
            HoodieDeltaStreamer.Config config = new HoodieDeltaStreamer.Config();
            config.targetBasePath = str2;
            config.targetTableName = "hoodie_trips_copy";
            config.tableType = HoodieDeltaStreamerTestBase.TABLE_TYPE_VALUE;
            config.sourceClassName = HoodieIncrSource.class.getName();
            config.operation = writeOperationType;
            config.sourceOrderingField = "timestamp";
            config.propsFilePath = HoodieDeltaStreamerTestBase.basePath + "/test-downstream-source.properties";
            config.sourceLimit = 1000L;
            if (null != str3) {
                config.schemaProviderClassName = str3;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + z);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.path=" + str);
            arrayList.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
            config.configs = arrayList;
            return config;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNCompactionCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNDeltaCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNCompactionCommitsAfterCommit(int i, String str, String str2, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitTimeline().findInstantsAfter(str).filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtleastNDeltaCommitsAfterCommit(int i, String str, String str2, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build();
            HoodieTimeline filterCompletedInstants = build.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(str).filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static String assertCommitMetadata(String str, String str2, FileSystem fileSystem, int i) throws IOException {
            HoodieTimeline filterCompletedInstants = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str2).build().getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant hoodieInstant = (HoodieInstant) filterCompletedInstants.lastInstant().get();
            HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) filterCompletedInstants.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
            Assertions.assertEquals(i, filterCompletedInstants.countInstants());
            Assertions.assertEquals(str, hoodieCommitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            return hoodieInstant.getTimestamp();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void waitTillCondition(Function<Boolean, Boolean> function, Future future, long j) throws Exception {
            Executors.newSingleThreadExecutor().submit(() -> {
                boolean z = false;
                while (!z && !future.isDone()) {
                    try {
                        Thread.sleep(2000L);
                        z = ((Boolean) function.apply(true)).booleanValue();
                    } catch (Throwable th) {
                        HoodieDeltaStreamerTestBase.LOG.warn("Got error :", th);
                        z = false;
                    }
                }
                return Boolean.valueOf(z);
            }).get(j, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtLeastNCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtLeastNReplaceCommits(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline completedReplaceTimeline = build.getActiveTimeline().getCompletedReplaceTimeline();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = completedReplaceTimeline.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertPendingIndexCommit(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterPendingIndexTimeline = build.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterPendingIndexTimeline.countInstants();
            Assertions.assertEquals(1, countInstants, "Got=" + countInstants + ", exp=1");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertCompletedIndexCommit(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterCompletedIndexTimeline = build.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedIndexTimeline.countInstants();
            Assertions.assertEquals(1, countInstants, "Got=" + countInstants + ", exp=1");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertNoReplaceCommits(String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline completedReplaceTimeline = build.getActiveTimeline().getCompletedReplaceTimeline();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = completedReplaceTimeline.countInstants();
            Assertions.assertEquals(0, countInstants, "Got=" + countInstants + ", exp =0");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtLeastNReplaceRequests(int i, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterPendingReplaceTimeline = build.getActiveTimeline().filterPendingReplaceTimeline();
            HoodieDeltaStreamerTestBase.LOG.info("Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterPendingReplaceTimeline.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void assertAtLeastNCommitsAfterRollback(int i, int i2, String str, FileSystem fileSystem) {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getRollbackTimeline().filterCompletedInstants();
            HoodieDeltaStreamerTestBase.LOG.info("Rollback Timeline Instants=" + build.getActiveTimeline().getInstants());
            int countInstants = filterCompletedInstants.countInstants();
            Assertions.assertTrue(i <= countInstants, "Got=" + countInstants + ", exp >=" + i);
            HoodieInstant hoodieInstant = (HoodieInstant) filterCompletedInstants.getInstants().get(0);
            int countInstants2 = build.getActiveTimeline().filterCompletedInstants().filter(hoodieInstant2 -> {
                return HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.GREATER_THAN, hoodieInstant.getTimestamp());
            }).countInstants();
            Assertions.assertTrue(i2 <= countInstants2, "Got=" + countInstants2 + ", exp >=" + i2);
        }
    }

    protected static void prepareTestSetup() throws IOException {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
        ORC_SOURCE_ROOT = basePath + "/orcFiles";
        JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
        testUtils = new KafkaTestUtils();
        testUtils.setup();
        topicName = "topic" + testNum;
        prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());
        prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        prepareORCDFSFiles(5, ORC_SOURCE_ROOT);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void prepareInitialConfigs(FileSystem fileSystem, String str, String str2) throws IOException {
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", fileSystem, str + "/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/base.properties", fileSystem, str + "/config/base.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/sql-transformer.properties", fileSystem, str + "/sql-transformer.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source.avsc", fileSystem, str + "/source.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved.avsc", fileSystem, str + "/source_evolved.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_evolved_post_processed.avsc", fileSystem, str + "/source_evolved_post_processed.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source-flattened.avsc", fileSystem, str + "/source-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target.avsc", fileSystem, str + "/target.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target-flattened.avsc", fileSystem, str + "/target-flattened.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_short_trip_uber.avsc", fileSystem, str + "/source_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/source_uber.avsc", fileSystem, str + "/source_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_short_trip_uber.avsc", fileSystem, str + "/target_short_trip_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/target_uber.avsc", fileSystem, str + "/target_uber.avsc");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/invalid_hive_sync_uber_config.properties", fileSystem, str + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/uber_config.properties", fileSystem, str + "/config/uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/short_trip_uber_config.properties", fileSystem, str + "/config/short_trip_uber_config.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/clusteringjob.properties", fileSystem, str + "/clusteringjob.properties");
        UtilitiesTestBase.Helpers.copyToDFS("streamer-config/indexer.properties", fileSystem, str + "/indexer.properties");
        writeCommonPropsToFile(fileSystem, str);
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "base.properties");
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", str + "/target.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", str + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fileSystem, str + "/test-downstream-source.properties");
        TypedProperties typedProperties2 = new TypedProperties();
        typedProperties2.setProperty("include", "sql-transformer.properties");
        typedProperties2.setProperty("hoodie.datasource.write.keygenerator.class", "invalid");
        typedProperties2.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties2.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", str + "/source.avsc");
        typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", str + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties2, fileSystem, str + "/" + PROPS_FILENAME_TEST_INVALID);
        TypedProperties typedProperties3 = new TypedProperties();
        typedProperties3.setProperty("include", "base.properties");
        typedProperties3.setProperty("hoodie.datasource.write.recordkey.field", "timestamp,_row_key");
        typedProperties3.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties3.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", str + "/source.avsc");
        typedProperties3.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", str + "/target.avsc");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties3, fileSystem, str + "/" + PROPS_FILENAME_INFER_COMPLEX_KEYGEN);
        typedProperties3.setProperty("hoodie.datasource.write.partitionpath.field", "");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties3, fileSystem, str + "/" + PROPS_FILENAME_INFER_NONPARTITIONED_KEYGEN);
        TypedProperties typedProperties4 = new TypedProperties();
        populateAllCommonProps(typedProperties4, str, str2);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties4, fileSystem, str + "/" + PROPS_FILENAME_TEST_SOURCE1);
        TypedProperties typedProperties5 = new TypedProperties();
        populateInvalidTableConfigFilePathProps(typedProperties5, str);
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties5, fileSystem, str + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
        TypedProperties typedProperties6 = new TypedProperties();
        typedProperties6.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        typedProperties6.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", str + "/config/invalid_hive_sync_uber_config.properties");
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties6, fileSystem, str + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void writeCommonPropsToFile(FileSystem fileSystem, String str) throws IOException {
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.setProperty("include", "sql-transformer.properties");
        typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        typedProperties.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", str + "/source.avsc");
        typedProperties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", str + "/target.avsc");
        typedProperties.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL);
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "hive_trips");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties, fileSystem, str + "/" + PROPS_FILENAME_TEST_SOURCE);
    }

    @BeforeAll
    public static void initClass() throws Exception {
        UtilitiesTestBase.initTestServices(false, true, false);
        prepareTestSetup();
    }

    @AfterAll
    public static void cleanupKafkaTestUtils() {
        if (testUtils != null) {
            testUtils.teardown();
        }
    }

    @BeforeEach
    public void setupTest() {
        TestDataSource.returnEmptyBatch = false;
        this.hudiOpts = new HashMap();
    }

    protected static void populateInvalidTableConfigFilePathProps(TypedProperties typedProperties, String str) {
        typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        typedProperties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
        typedProperties.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
        typedProperties.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", str + "/config/invalid_uber_config.properties");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void populateAllCommonProps(TypedProperties typedProperties, String str, String str2) {
        populateCommonProps(typedProperties, str);
        populateCommonKafkaProps(typedProperties, str2);
        populateCommonHiveProps(typedProperties);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void populateCommonProps(TypedProperties typedProperties, String str) {
        typedProperties.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        typedProperties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
        typedProperties.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
        typedProperties.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", str + "/config/uber_config.properties");
        typedProperties.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", str + "/config/short_trip_uber_config.properties");
    }

    protected static void populateCommonKafkaProps(TypedProperties typedProperties, String str) {
        typedProperties.setProperty("bootstrap.servers", str);
        typedProperties.setProperty("auto.offset.reset", "earliest");
        typedProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        typedProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        typedProperties.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void populateCommonHiveProps(TypedProperties typedProperties) {
        typedProperties.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), HiveTestService.HS2_JDBC_URL);
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb2");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key(), "false");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
        typedProperties.setProperty(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), MultiPartKeysValueExtractor.class.getName());
    }

    protected static void prepareParquetDFSFiles(int i) throws IOException {
        prepareParquetDFSFiles(i, PARQUET_SOURCE_ROOT);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void prepareParquetDFSFiles(int i, String str) throws IOException {
        prepareParquetDFSFiles(i, str, FIRST_PARQUET_FILE_NAME, false, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static HoodieTestDataGenerator prepareParquetDFSFiles(int i, String str, String str2, boolean z, String str3, Schema schema) throws IOException {
        String str4 = str + "/" + str2;
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        if (z) {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(i), str3), schema), new Path(str4), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInserts("000", Integer.valueOf(i))), new Path(str4));
        }
        return hoodieTestDataGenerator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void prepareParquetDFSUpdates(int i, String str, String str2, boolean z, String str3, Schema schema, HoodieTestDataGenerator hoodieTestDataGenerator, String str4) throws IOException {
        String str5 = str + "/" + str2;
        if (z) {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateUpdatesAsPerSchema(str4, Integer.valueOf(i), str3), schema), new Path(str5), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateUpdates(str4, Integer.valueOf(i))), new Path(str5));
        }
    }

    protected void prepareParquetDFSSource(boolean z, boolean z2, String str) throws IOException {
        prepareParquetDFSSource(z, z2, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareParquetDFSSource(boolean z, boolean z2) throws IOException {
        prepareParquetDFSSource(z, z2, "");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareParquetDFSSource(boolean z, boolean z2, String str, String str2, String str3, String str4, boolean z3, String str5) throws IOException {
        prepareParquetDFSSource(z, z2, str, str2, str3, str4, z3, str5, "");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareParquetDFSSource(boolean z, boolean z2, String str, String str2, String str3, String str4, boolean z3, String str5, String str6) throws IOException {
        prepareParquetDFSSource(z, z2, str, str2, str3, str4, z3, str5, str6, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareParquetDFSSource(boolean z, boolean z2, String str, String str2, String str3, String str4, boolean z3, String str5, String str6, TypedProperties typedProperties) throws IOException {
        TypedProperties typedProperties2 = new TypedProperties(typedProperties);
        if (z3) {
            populateCommonProps(typedProperties2, basePath);
        }
        typedProperties2.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
        typedProperties2.setProperty("include", "base.properties");
        typedProperties2.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties2.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties2.setProperty("hoodie.datasource.write.partitionpath.field", str5);
        if (z) {
            typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + str);
            if (z2) {
                typedProperties2.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/" + str2);
            }
        }
        typedProperties2.setProperty("hoodie.deltastreamer.source.dfs.root", str4);
        if (!StringUtils.isNullOrEmpty(str6)) {
            typedProperties2.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, str6);
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties2, fs, basePath + "/" + str3);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareAvroKafkaDFSSource(String str, Long l, String str2, String str3, TypedProperties typedProperties) throws IOException {
        TypedProperties typedProperties2 = new TypedProperties(typedProperties);
        typedProperties2.setProperty("bootstrap.servers", testUtils.brokerAddress());
        typedProperties2.put(HoodieStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "false");
        typedProperties2.setProperty("auto.offset.reset", "earliest");
        typedProperties2.setProperty("include", "base.properties");
        typedProperties2.setProperty("hoodie.embed.timeline.server", "false");
        typedProperties2.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        typedProperties2.setProperty("hoodie.datasource.write.partitionpath.field", str3);
        typedProperties2.setProperty("hoodie.deltastreamer.source.kafka.topic", str2);
        typedProperties2.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
        typedProperties2.setProperty("enable.auto.commit", "false");
        typedProperties2.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(), ByteArrayDeserializer.class.getName());
        typedProperties2.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", l != null ? String.valueOf(l) : String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
        typedProperties2.setProperty("group.id", UUID.randomUUID().toString());
        UtilitiesTestBase.Helpers.savePropsToDFS(typedProperties2, fs, basePath + "/" + str);
    }

    protected static void prepareORCDFSFiles(int i) throws IOException {
        prepareORCDFSFiles(i, ORC_SOURCE_ROOT);
    }

    protected static void prepareORCDFSFiles(int i, String str) throws IOException {
        prepareORCDFSFiles(i, str, FIRST_ORC_FILE_NAME, false, null, null);
    }

    protected static void prepareORCDFSFiles(int i, String str, String str2, boolean z, String str3, Schema schema) throws IOException {
        String str4 = str + "/" + str2;
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
        if (z) {
            UtilitiesTestBase.Helpers.saveORCToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(i), str3), schema), new Path(str4), HoodieTestDataGenerator.ORC_TRIP_SCHEMA);
        } else {
            UtilitiesTestBase.Helpers.saveORCToDFS(UtilitiesTestBase.Helpers.toGenericRecords(hoodieTestDataGenerator.generateInserts("000", Integer.valueOf(i))), new Path(str4));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<String> getTableServicesConfigs(int i, String str, String str2, String str3, String str4, String str5) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(String.format("%s=%d", SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), Integer.valueOf(i)));
        if (StringUtils.nonEmpty(str)) {
            arrayList.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), str));
        }
        if (StringUtils.nonEmpty(str2)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), str2));
        }
        if (StringUtils.nonEmpty(str3)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), str3));
        }
        if (StringUtils.nonEmpty(str4)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), str4));
        }
        if (StringUtils.nonEmpty(str5)) {
            arrayList.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), str5));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addCommitToTimeline(HoodieTableMetaClient hoodieTableMetaClient) throws IOException {
        addCommitToTimeline(hoodieTableMetaClient, Collections.emptyMap());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addCommitToTimeline(HoodieTableMetaClient hoodieTableMetaClient, Map<String, String> map) throws IOException {
        addCommitToTimeline(hoodieTableMetaClient, WriteOperationType.UPSERT, "commit", map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addReplaceCommitToTimeline(HoodieTableMetaClient hoodieTableMetaClient, Map<String, String> map) throws IOException {
        addCommitToTimeline(hoodieTableMetaClient, WriteOperationType.CLUSTER, "replacecommit", map);
    }

    static void addCommitToTimeline(HoodieTableMetaClient hoodieTableMetaClient, WriteOperationType writeOperationType, String str, Map<String, String> map) throws IOException {
        HoodieCommitMetadata hoodieCommitMetadata = new HoodieCommitMetadata();
        hoodieCommitMetadata.setOperationType(writeOperationType);
        map.forEach((str2, str3) -> {
        });
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        hoodieTableMetaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, str, createNewInstantTime));
        hoodieTableMetaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, str, createNewInstantTime));
        hoodieTableMetaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, str, createNewInstantTime), Option.of(hoodieCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertRecordCount(long j, String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        Assertions.assertEquals(j, sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).count());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertDistinctRecordCount(long j, String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        Assertions.assertEquals(j, sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).select("_hoodie_record_key", new String[0]).distinct().count());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Row> countsPerCommit(String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        return sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertDistanceCount(long j, String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
        Assertions.assertEquals(j, sQLContext.sql("select * from tmp_trips where haversine_distance is not NULL").count());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertDistanceCountWithExactValue(long j, String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).registerTempTable("tmp_trips");
        Assertions.assertEquals(j, sQLContext.sql("select * from tmp_trips where haversine_distance = 1.0").count());
    }

    Map<String, Long> getPartitionRecordCount(String str, SQLContext sQLContext) {
        sQLContext.clearCache();
        List collectAsList = sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD, new String[0]).count().collectAsList();
        HashMap hashMap = new HashMap();
        collectAsList.stream().forEach(row -> {
        });
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertNoPartitionMatch(String str, SQLContext sQLContext, String str2) {
        sQLContext.clearCache();
        Assertions.assertEquals(0L, sQLContext.read().options(this.hudiOpts).format("org.apache.hudi").load(str).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + str2).count());
    }
}
