package org.apache.hudi.testutils;

import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/hudi/testutils/SparkClientFunctionalTestHarness.class */
public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
    protected static int timelineServicePort = ((Integer) FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue()).intValue();
    private static transient SparkSession spark;
    private static transient SQLContext sqlContext;
    private static transient JavaSparkContext jsc;
    private static transient HoodieSparkEngineContext context;
    private static transient TimelineService timelineService;
    private FileSystem fileSystem;
    protected boolean initialized = false;

    @TempDir
    protected Path tempDir;

    public String basePath() {
        return this.tempDir.toAbsolutePath().toUri().toString();
    }

    @Override // org.apache.hudi.testutils.providers.SparkProvider
    public SparkSession spark() {
        return spark;
    }

    @Override // org.apache.hudi.testutils.providers.SparkProvider
    public SQLContext sqlContext() {
        return sqlContext;
    }

    @Override // org.apache.hudi.testutils.providers.SparkProvider
    public JavaSparkContext jsc() {
        return jsc;
    }

    public Configuration hadoopConf() {
        return jsc.hadoopConfiguration();
    }

    public FileSystem fs() {
        if (this.fileSystem == null) {
            this.fileSystem = FSUtils.getFs(basePath(), hadoopConf());
        }
        return this.fileSystem;
    }

    /* renamed from: context, reason: merged with bridge method [inline-methods] */
    public HoodieSparkEngineContext m53context() {
        return context;
    }

    public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType hoodieTableType) throws IOException {
        return getHoodieMetaClient(hoodieTableType, new Properties());
    }

    public HoodieTableMetaClient getHoodieMetaClient(HoodieTableType hoodieTableType, Properties properties) throws IOException {
        return getHoodieMetaClient(hadoopConf(), basePath(), hoodieTableType, properties);
    }

    public HoodieTableMetaClient getHoodieMetaClient(Configuration configuration, String str, HoodieTableType hoodieTableType, Properties properties) throws IOException {
        return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, str, HoodieTableMetaClient.withPropertyBuilder().setTableName("raw_trips").setTableType(hoodieTableType).setPayloadClass(HoodieAvroPayload.class).fromProperties(properties).build());
    }

    public HoodieTableMetaClient getHoodieMetaClient(Configuration configuration, String str) throws IOException {
        return getHoodieMetaClient(configuration, str, getPropertiesForKeyGen(true));
    }

    public HoodieTableMetaClient getHoodieMetaClient(Configuration configuration, String str, Properties properties) throws IOException {
        return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, str, HoodieTableMetaClient.withPropertyBuilder().setTableName("raw_trips").setTableType(HoodieTableType.COPY_ON_WRITE).setPayloadClass(HoodieAvroPayload.class).fromProperties(properties).build());
    }

    /* renamed from: getHoodieWriteClient, reason: merged with bridge method [inline-methods] */
    public SparkRDDWriteClient m54getHoodieWriteClient(HoodieWriteConfig hoodieWriteConfig) throws IOException {
        return new SparkRDDWriteClient(m53context(), hoodieWriteConfig);
    }

    @BeforeEach
    public synchronized void runBeforeEach() {
        this.initialized = spark != null;
        if (this.initialized) {
            return;
        }
        SparkConf conf = conf();
        HoodieSparkKryoRegistrar$.MODULE$.register(conf);
        SparkRDDReadClient.addHoodieSupport(conf);
        spark = SparkSession.builder().config(conf).getOrCreate();
        sqlContext = spark.sqlContext();
        jsc = new JavaSparkContext(spark.sparkContext());
        context = new HoodieSparkEngineContext(jsc);
        timelineService = HoodieClientTestUtils.initTimelineService(context, basePath(), incrementTimelineServicePortToUse());
        timelineServicePort = timelineService.getServerPort();
    }

    @AfterAll
    public static synchronized void resetSpark() {
        if (spark != null) {
            spark.close();
            spark = null;
        }
        if (timelineService != null) {
            timelineService.close();
        }
    }

    @AfterEach
    public void closeFileSystem() throws IOException {
        if (this.fileSystem != null) {
            this.fileSystem.close();
            this.fileSystem = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JavaRDD<HoodieRecord> tagLocation(HoodieIndex hoodieIndex, JavaRDD<HoodieRecord> javaRDD, HoodieTable hoodieTable) {
        return HoodieJavaRDD.getJavaRDD(hoodieIndex.tagLocation(HoodieJavaRDD.of(javaRDD), context, hoodieTable));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JavaRDD<WriteStatus> updateLocation(HoodieIndex hoodieIndex, JavaRDD<WriteStatus> javaRDD, HoodieTable hoodieTable) {
        return HoodieJavaRDD.getJavaRDD(hoodieIndex.updateLocation(HoodieJavaRDD.of(javaRDD), context, hoodieTable));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient hoodieTableMetaClient, List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        return insertRecordsToMORTable(hoodieTableMetaClient, list, sparkRDDWriteClient, hoodieWriteConfig, str, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient hoodieTableMetaClient, List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str, boolean z) throws IOException {
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieTableMetaClient);
        JavaRDD insert = sparkRDDWriteClient.insert(jsc().parallelize(list, 1), str);
        List collect = insert.collect();
        Assertions.assertNoWriteErrors(collect);
        if (z) {
            sparkRDDWriteClient.commit(str, insert);
        }
        Assertions.assertFileSizesEqual(collect, writeStatus -> {
            return Long.valueOf(FSUtils.getFileSize(reload.getFs(), new org.apache.hadoop.fs.Path(reload.getBasePath(), writeStatus.getStat().getPath())));
        });
        HoodieSparkTable create = HoodieSparkTable.create(hoodieWriteConfig, m53context(), reload);
        Option lastInstant = reload.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        Assertions.assertTrue(lastInstant.isPresent());
        Assertions.assertEquals(str, ((HoodieInstant) lastInstant.get()).getTimestamp(), "Delta commit should be specified value");
        Assertions.assertFalse(reload.getActiveTimeline().getCommitTimeline().lastInstant().isPresent());
        FileStatus[] listAllBaseFilesInPath = listAllBaseFilesInPath(create);
        Assertions.assertTrue(!getHoodieTableFileSystemView(reload, reload.getCommitTimeline().filterCompletedInstants(), listAllBaseFilesInPath).getLatestBaseFiles().findAny().isPresent());
        return getHoodieTableFileSystemView(reload, create.getCompletedCommitsTimeline(), listAllBaseFilesInPath).getLatestBaseFiles();
    }

    protected void updateRecordsInMORTable(HoodieTableMetaClient hoodieTableMetaClient, List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str) throws IOException {
        updateRecordsInMORTable(hoodieTableMetaClient, list, sparkRDDWriteClient, hoodieWriteConfig, str, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateRecordsInMORTable(HoodieTableMetaClient hoodieTableMetaClient, List<HoodieRecord> list, SparkRDDWriteClient sparkRDDWriteClient, HoodieWriteConfig hoodieWriteConfig, String str, boolean z) throws IOException {
        HoodieTableMetaClient reload = HoodieTableMetaClient.reload(hoodieTableMetaClient);
        HashMap hashMap = new HashMap();
        for (HoodieRecord hoodieRecord : list) {
            if (!hashMap.containsKey(hoodieRecord.getKey())) {
                hashMap.put(hoodieRecord.getKey(), hoodieRecord);
            }
        }
        JavaRDD upsert = sparkRDDWriteClient.upsert(jsc().parallelize(list, 1), str);
        List collect = upsert.collect();
        Assertions.assertNoWriteErrors(collect);
        if (z) {
            sparkRDDWriteClient.commit(str, upsert);
        }
        Assertions.assertFileSizesEqual(collect, writeStatus -> {
            return Long.valueOf(FSUtils.getFileSize(reload.getFs(), new org.apache.hadoop.fs.Path(reload.getBasePath(), writeStatus.getStat().getPath())));
        });
        Option lastInstant = reload.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
        Assertions.assertTrue(lastInstant.isPresent());
        Assertions.assertEquals(str, ((HoodieInstant) lastInstant.get()).getTimestamp(), "Latest Delta commit should match specified time");
        Assertions.assertFalse(reload.getActiveTimeline().getCommitTimeline().firstInstant().isPresent());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStatus[] listAllBaseFilesInPath(HoodieTable hoodieTable) throws IOException {
        return HoodieTestTable.of(hoodieTable.getMetaClient()).listAllBaseFiles(hoodieTable.getBaseFileExtension());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getPropertiesForKeyGen() {
        return getPropertiesForKeyGen(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getPropertiesForKeyGen(boolean z) {
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(z));
        properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
        properties.put("hoodie.datasource.write.partitionpath.field", SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
        properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
        properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
        properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder builder, boolean z) {
        builder.withProperties(getPropertiesForKeyGen(z));
        if (z) {
            return;
        }
        builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig getConfig(Boolean bool) {
        return getConfigBuilder(bool).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig getConfig(Boolean bool, Boolean bool2) {
        return getConfigBuilder(bool, bool2, HoodieIndex.IndexType.BLOOM).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig.Builder getConfigBuilder(Boolean bool) {
        return getConfigBuilder(bool, HoodieIndex.IndexType.BLOOM);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, HoodieIndex.IndexType indexType) {
        return getConfigBuilder(bool, (Boolean) false, indexType);
    }

    protected HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, long j, HoodieClusteringConfig hoodieClusteringConfig) {
        return getConfigBuilder(bool, false, HoodieIndex.IndexType.BLOOM, j, hoodieClusteringConfig, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, Boolean bool2, HoodieIndex.IndexType indexType) {
        return getConfigBuilder(bool, bool2, indexType, 1073741824L, HoodieClusteringConfig.newBuilder().build(), false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HoodieWriteConfig.Builder getConfigBuilder(Boolean bool, Boolean bool2, HoodieIndex.IndexType indexType, long j, HoodieClusteringConfig hoodieClusteringConfig, boolean z) {
        return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}").withParallelism(2, 2).withDeleteParallelism(2).withAutoCommit(bool.booleanValue()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(z).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1073741824L).parquetMaxFileSize(1073741824L).build()).withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table").withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).withEnableBackupForRemoteFileSystemView(false).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withClusteringConfig(hoodieClusteringConfig).withRollbackUsingMarkers(bool2.booleanValue());
    }

    protected Dataset<Row> toDataset(List<HoodieRecord> list, Schema schema) {
        return AvroConversionUtils.createDataFrame(jsc.parallelize((List) list.stream().map(hoodieRecord -> {
            try {
                return (GenericRecord) ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(schema).get();
            } catch (IOException e) {
                throw new HoodieIOException("Failed to extract Avro payload", e);
            }
        }).collect(Collectors.toList()), 2).rdd(), schema.toString(), spark);
    }

    protected int incrementTimelineServicePortToUse() {
        timelineServicePort = (((timelineServicePort + 1) - 1024) % 64512) + 1024;
        return timelineServicePort;
    }
}
